-
Notifications
You must be signed in to change notification settings - Fork 70
/
client_api_consume.cpp
1099 lines (896 loc) · 58.1 KB
/
client_api_consume.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include "client_common.h"
#include "common.h"
// build a consume payload for a broker API request
TankClient::broker_outgoing_payload *TankClient::build_consume_broker_req_payload(const broker_api_request *broker_req) {
enum { trace = false,
};
auto payload = new_req_payload(const_cast<broker_api_request *>(broker_req));
[[maybe_unused]] auto broker = broker_req->br;
auto b = payload->b;
auto api_req = broker_req->api_req;
if (trace) {
SLog("Building CONSUME request payload for ", broker_req->partitions_list.size(), " partitions\n");
}
TANK_EXPECT(b);
b->pack(static_cast<uint8_t>(TankAPIMsgType::Consume)); // request type
b->pack(static_cast<uint32_t>(0)); // size (will patch)
#ifdef TANK_SUPPORT_CONSUME_FLAGS
// 2022-08-18: version >= 3 includes op_flags after min_size PARTITION_PROVIDER
// (only iff flags is != 0 )
b->pack(static_cast<uint16_t>(api_req->as.consume.flags ? 3 : 2)); // client version
#else
b->pack(static_cast<uint16_t>(2)); // client version
#endif
b->pack(broker_req->id);
b->pack(""_s8); // client identifier
b->pack(api_req->as.consume.max_wait);
b->pack(static_cast<uint32_t>(api_req->as.consume.min_size));
#ifdef TANK_SUPPORT_CONSUME_FLAGS
if (api_req->as.consume.flags) {
b->pack(api_req->as.consume.flags); // 2022-08-18 (for version >= 3)
}
#endif
auto it = broker_req->partitions_list.next;
uint8_t topics_cnt{0};
const auto topics_cnt_buf_offset = b->size();
b->pack(static_cast<uint8_t>(0));
// partitions are already grouped by topic in assign_req_partitions_to_api_req()
while (it != &broker_req->partitions_list) {
auto partition_req = containerof(request_partition_ctx, partitions_list_ll, it);
const auto topic = partition_req->topic;
b->pack(topic);
uint8_t partitions_cnt{0};
const auto partitions_cnt_buf_offset = b->size();
b->pack(static_cast<uint8_t>(0));
do {
b->pack(partition_req->partition);
b->pack(partition_req->as_op.consume.seq_num);
b->pack(static_cast<uint32_t>(partition_req->as_op.consume.min_fetch_size));
if (trace) {
SLog("Requesting ", topic, "/", partition_req->partition, ", from ", partition_req->as_op.consume.seq_num, "\n");
}
++partitions_cnt;
} while ((it = it->next) != &broker_req->partitions_list && (partition_req = switch_list_entry(request_partition_ctx, partitions_list_ll, it))->topic == topic);
*reinterpret_cast<uint8_t *>(b->data() + partitions_cnt_buf_offset) = partitions_cnt;
++topics_cnt;
}
*reinterpret_cast<uint8_t *>(b->data() + topics_cnt_buf_offset) = topics_cnt;
*reinterpret_cast<uint32_t *>(b->data() + sizeof(uint8_t)) = b->size() - sizeof(uint8_t) - sizeof(uint32_t); // patch
payload->iovecs.data[0].iov_base = b->data();
payload->iovecs.data[0].iov_len = b->size();
payload->iovecs.size = 1;
return payload;
}
uint32_t TankClient::consume(const std::vector<std::pair<topic_partition,
std::pair<uint64_t, uint32_t>>> &sources,
const uint64_t max_wait,
const uint32_t min_size,
const uint8_t op_flags) {
return consume(sources.data(), sources.size(),
max_wait,
min_size,
op_flags);
}
uint32_t TankClient::consume(const std::pair<topic_partition, std::pair<uint64_t, uint32_t>> *sources, const size_t total_sources,
const uint64_t max_wait,
const uint32_t min_size,
const uint8_t op_flags_) {
enum {
trace = false,
};
#ifdef TANK_SUPPORT_CONSUME_FLAGS
const auto op_flags = op_flags_;
#else
enum : uint8_t {
op_flags = 0u,
};
#endif
// fan-out: api_req will coordinate broker api requests
// NOTE: get_api_request() will update now_ms
// We can have the client reactor track the api request for expiration (i.e
// after max_wait or so, and if the API request hasn't been processed yet/is still around,
// then reactor will explicitly abort it with a timeout error), or not. We can do that
// by specifying a timeout in get_api_request() > 0.
//
// The problem with that otherwise good idea is that there are TANK client applications that may
// block for a substantial amount of time between invoking an consume method and invoking poll() - which
// is when the reactor runs and gets to check timeouts, network I/O etc - and that can result in
// the reactor aborting the API request before TANK (service) even got to process it, just because
// it's been too long since it was submitted via get_api_request()
// Example:
// {
// const auto req_id = tank_client.consume_from(, .. max_wait = 1000, ...);
// slow_bootstrap_function();
// for (;;) { // application main event loop
// if (!req_id) { req_id = tank_client.consume_from(); }
// tank_client.poll(1000);
// }
// }
// In this example, the application first issues a consume request, then intializes state via slow_bootstrap_function()
// which is the recommended way to initialize state that may be persisted locally and process all events that
// may affect that state since boostrap commenced.
// The problem is, if slow_bootstrap_function() takes say over 1s then as soon as tank_client.poll() is executed,
// the reactor will likely abort the api request via check_pending_api_responses() because it's been too long
// and the request hasn't been ready yet.
//
// Example:
// {
// for (;;) {// application main event loop
// if (!req_id) { req_id = tank_client.consume_from(.., max_wait = 1000, ...); }
//
// tank_client.poll(8000);
//
// if (!tank_client.faults.empty()) {
// process_faults(tank_client);
// req_id = 0;
// }
//
// if (tank_client.consumed().empty()) { continue; }
//
// for (const auto &part : tank_client.consumed()) {
// for (const auto m : part.msgs()) {
// process_message(m);
// }
// }
// req_id = 0;
// }
// }
//
// In this example, process_message() may take 1s, so next time poll() is called
// the api request will likely be aborted as well.
//
// So it's probably a good idea to not have the client runtime abort the api request
// i.e use get_api_request() with timeout set to 0
// The problem with that though is that if the TANK node takes too long to respond (see
// below for reasons why that may happen), the client will never know i.e
// the api request will never become ready or timeout
// Reasons include:
// - node stuck for some reason (e.g SIGSTOP signal sent)
// - network packets relay problems between the client and the node
// - other reasons
//
// For now, we are not setting an explicitl timeout but this not ideal
// We need something better. Perhaps we need to consider how long it's been since
// the last call to poll() and adjust the timeout of all pending api requests by that time?
// Not sure what's the right way to deal with it, but we need something more there.
#if 0
auto api_req = get_api_request(max_wait ? max_wait + 1000 : 20 * 1000);
#else
auto api_req = get_api_request(0);
#endif
std::vector<std::pair<broker *, request_partition_ctx *>> contexts;
#ifdef TANK_SUPPORT_CONSUME_FLAGS
const bool use_provider = op_flags & unsigned(ConsumeFlags::prefer_local_node);
#endif
if (trace) {
SLog(ansifmt::bold, ansifmt::color_red, ansifmt::inverse, "Will consume from ", total_sources, " sources", ansifmt::reset, ", op_flags = ", op_flags, "\n");
}
// initialize the api request for consume
api_req->as.consume.max_wait = max_wait;
api_req->as.consume.min_size = min_size;
api_req->as.consume.flags = op_flags;
api_req->type = api_request::Type::Consume;
// we could have a generic method that returns
// a decltype(contexts) and then we would
// initialize the respective request partition context, but it's not currently necessary
for (std::size_t i = 0; i < total_sources; ++i) {
const auto &it = sources[i];
auto topic = intern_topic(it.first.first);
auto partition = it.first.second;
auto req_part = get_request_partition_ctx();
broker *broker;
// if we have a leader for this topic, choose it, explicitly
// otherwise, select any broker and we 'll get to update assignments later
// and potentially retry the request on another broker
//
// When we get back a response for a partition, we can use set_leader() to assign
// that broker as the leader for it(in case it's not paired with that already).
//
// As an optimization, we could have tracked in request_partition_ctx wether
// the broker is provided by partition_leader() or any_broker()
// so that we would only set_leader() later if we got the broker from any_broker(), which
// would save us an unordered_map<>::emplace.
//
// We are not doing this for now though.
#ifdef TANK_SUPPORT_CONSUME_FLAGS
if (use_provider) {
broker = partition_provider(topic, partition) ?: any_broker();
if (trace) {
SLog("provider ", broker ? broker->ep : Switch::endpoint{}, "\n");
}
} else
#endif
{
broker = partition_leader(topic, partition) ?: any_broker();
if (trace) {
SLog("leader ", broker ? broker->ep : Switch::endpoint{}, "\n");
}
}
// initialize the broker partition request context
req_part->topic = topic;
req_part->partition = partition;
req_part->as_op.consume.seq_num = it.second.first;
req_part->as_op.consume.min_fetch_size = it.second.second;
contexts.emplace_back(std::make_pair(broker, req_part));
if (trace) {
SLog("Fetch from ", req_part->topic, "/", req_part->partition, " from seqnum = ", it.second.first,
", min_fetch_size = ", it.second.second,
" (", size_repr(it.second.second), ")\n");
}
TANK_EXPECT(req_part->topic == topic);
TANK_EXPECT(req_part->partition == partition);
}
assign_req_partitions_to_api_req(api_req.get(), &contexts);
TANK_EXPECT(api_req->type == api_request::Type::Consume);
return schedule_new_api_req(std::move(api_req));
}
// TODO:
// There is an obvious optimization that we need to implement once we have the baseline functionality(i.e whatever's implemented currently) right
// so that we can both profile and measure against it, but also verify against it for correctness.
// Instead of waiting until we have received the whole response (say 64MBs worth of bundles), we can instead _progressively_. We would maintain
// a state machine, and we would progressively decode/parse bundles and message sets as we read in incoming data, so by the time we have received
// the whole response, we would have effecitvely also parsed everything, because the data are coming in in batches(i.e we don't get the whole
// response in one packet, unless its a tiny response).
// This provides for all kinds of interesting benefits:
// - when processing the whole request(all bundles), it will take a 'long' time to do it, which means, the threads will block until that happens, thereby
// slowing processing of all other connections/requests/tasks. By interleaving/doing this concurrently, we
// only block the thread for far shorter amounts of time, and because the data arrive slowly, we can process the new data, and by the time we
// are done parsing new bundles, new data would have arrived, thereby doing this in an alternating fashion.
// - this could potentially provide for much faster consume requests, which is especially important for clients/consumers
// - reducing blocking of the main thread because of the incremental processing semantics would not only benefit TANK (see Service::process_peer_consume_resp() impl.)
// but also the client because e.g AppServers may use a single client to multiple requests
//
// UPDATE: see client_api_fast_consume.cpp
bool TankClient::process_consume(connection *const c, const uint8_t *const content, const size_t len) {
enum {
trace = false,
trace_faults = false,
};
assert(c);
assert(c->type == connection::Type::Tank);
assert(c->fd > 2);
const auto *p = content;
const auto end = p + len;
// the response header length is encoded in the response so that we can quickly jump to
// the beginning of the bundles.
// IMPORTANT: the bundles content of all involved partitions is streamed right AFTER the response header
if (unlikely(p + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint8_t) > end)) {
if (trace) {
SLog("Unable to decode header\n");
}
return false;
}
const auto hdr_size = decode_pod<uint32_t>(p);
const auto *bundles_chunk = p + hdr_size; // first partition's bundles chunk (see ^^ about partitions chunks)
const auto req_id = decode_pod<uint32_t>(p);
const auto it = pending_brokers_requests.find(req_id);
str_view8 key;
std::vector<request_partition_ctx *> no_leader, retry; // TODO: reuse
#ifndef LEAN_SWITCH
if (trace) {
// to verify that segmentation works well
// in Service::process_consume() where we create multiple data_vector_payload's if needed (search for patch_list_index)
// use different sizes for data_vector_payload::iov[], build TANK, and then try stress_test_local_tank.cpp with "consume many"
// and make sure hash is the same regardless of the size of iov[]
SLog(ansifmt::bold, ansifmt::color_brown, ansifmt::inverse, "CONSUME response len = ", len,
" (", size_repr(len), ")", ansifmt::reset,
" hash = ", str_view32(reinterpret_cast<const char *>(content), len).CRC32(), "\n");
}
#endif
if (it == pending_brokers_requests.end()) {
// this is probably fine; timed-out or whatever else
// we are just going to ignore it
if (trace) {
SLog("Ignoring, unknown broker request ", req_id, "(no longer around)\n");
}
return true;
}
auto br_req = it->second;
auto api_req = br_req->api_req;
auto br_req_partctx_it = br_req->partitions_list.next;
bool retain_buffer = false;
const auto topics_cnt = decode_pod<uint8_t>(p);
bool any_faults = false;
[[maybe_unused]] const auto before = Timings::Microseconds::Tick();
std::vector<IOBuffer *> used_buffers; // TODO: reuse
const auto req_op_flags = api_req->as.consume.flags; // 2022-08-18
const bool prefer_local_node = req_op_flags & unsigned(ConsumeFlags::prefer_local_node);
DEFER({
for (auto b : used_buffers) {
put_buffer(b);
}
});
if (trace) {
SLog("Response for topics ", topics_cnt, "\n");
}
// TODO:
// we would need to support "invalid requests"
// where e.g a consume request includes many thousands of partitions
// and we 'd like to respond with an invalid request
for (size_t i{0}; i < topics_cnt; ++i) {
const auto len = decode_pod<uint8_t>(p);
if (unlikely(p + len + sizeof(uint8_t) > end)) {
if (trace) {
SLog("Unable to decode topic name and partitions count\n");
}
return false;
}
const str_view8 topic_name(reinterpret_cast<const char *>(p), len);
p += len;
const auto partitions_cnt = decode_pod<uint8_t>(p);
uint64_t log_base_seqnum;
if (trace) {
SLog("Topic [", topic_name, "], for ", partitions_cnt, " partitions from ", c->as.tank.br->ep, " for request ", api_req->request_id, "\n");
}
if (unlikely(p + sizeof(uint16_t) * partitions_cnt > end)) {
if (trace) {
SLog("Not enough content for partitions\n");
}
return false;
}
if (*reinterpret_cast<const uint16_t *>(p) == std::numeric_limits<uint16_t>::max()) {
if (trace_faults) {
SLog("Unknown topic [", topic_name, "]\n");
}
capture_unknown_topic_fault(api_req, topic_name);
any_faults = true;
// get rid of all partition contexts associated with this topic
do {
auto req_part = switch_list_entry(request_partition_ctx,
partitions_list_ll,
br_req_partctx_it);
auto next = br_req_partctx_it->next;
if (trace) {
SLog("Ignoring partition ", req_part->partition, "\n");
}
discard_request_partition_ctx(api_req, req_part);
br_req_partctx_it = next;
} while (br_req_partctx_it != &br_req->partitions_list &&
switch_list_entry(request_partition_ctx, partitions_list_ll, br_req_partctx_it)->topic == topic_name);
p += sizeof(uint16_t);
continue;
}
for (size_t k{0}; k < partitions_cnt; ++k) {
if (unlikely(p + sizeof(uint16_t) + sizeof(uint8_t) > end)) {
if (trace) {
SLog("Unable to decode (partition id, err_flags)\n");
}
return false;
}
const auto partition_id = decode_pod<uint16_t>(p);
const auto err_flags = decode_pod<uint8_t>(p);
enum {
trace = false,
};
if (trace) {
SLog(ansifmt::color_green, "For partition ", partition_id, ", err_flags ", err_flags, ansifmt::reset, "\n");
}
if (err_flags == 0xfb) {
// system error; likely open_partition_log() failed
auto next = br_req_partctx_it->next;
if (trace_faults) {
SLog("system error while attempting to access partition ", topic_name, "/", partition_id, "\n");
}
capture_system_fault(api_req, topic_name, partition_id);
any_faults = true;
TANK_EXPECT(br_req_partctx_it != &br_req->partitions_list);
auto req_part = switch_list_entry(request_partition_ctx, partitions_list_ll, br_req_partctx_it);
discard_request_partition_ctx(api_req, req_part);
br_req_partctx_it = next;
continue;
} else if (err_flags == 0xff) {
auto next = br_req_partctx_it->next;
if (trace) {
SLog("Undefined partition ", topic_name, "/", partition_id, "\n");
}
capture_unknown_partition_fault(api_req, topic_name, partition_id);
any_faults = true;
TANK_EXPECT(br_req_partctx_it != &br_req->partitions_list);
auto req_part = switch_list_entry(request_partition_ctx, partitions_list_ll, br_req_partctx_it);
discard_request_partition_ctx(api_req, req_part);
br_req_partctx_it = next;
continue;
} else if (err_flags == 0xfd) {
// no leader
enum {
trace = false,
};
auto next = br_req_partctx_it->next;
if (trace) {
SLog("No current leader for ", topic_name, "/", partition_id, "\n");
}
TANK_EXPECT(br_req_partctx_it != &br_req->partitions_list);
auto req_part = switch_list_entry(request_partition_ctx, partitions_list_ll, br_req_partctx_it);
no_leader.emplace_back(req_part);
req_part->partitions_list_ll.detach_and_reset();
br_req_partctx_it = next;
continue;
} else if (err_flags == 0xfc) {
// different leader or partition provider
enum {
trace = false,
};
auto next = br_req_partctx_it->next;
if (unlikely(p + sizeof(uint32_t) + sizeof(uint16_t) > end)) {
if (trace) {
SLog("Unable to decode new leader\n");
}
return false;
}
const Switch::endpoint ep{decode_pod<uint32_t>(p), decode_pod<uint16_t>(p)};
TANK_EXPECT(br_req_partctx_it != &br_req->partitions_list);
auto req_part = switch_list_entry(request_partition_ctx,
partitions_list_ll,
br_req_partctx_it);
if (trace) {
SLog(ansifmt::color_cyan, "Leader/provider for ", topic_name, "/", partition_id, " is now ", ep, ansifmt::reset, "\n");
}
if (prefer_local_node) {
// PARTITION_PROVIDER
if (trace) {
SLog("will set_partition_provider() to ", ep, "\n");
}
set_partition_provider(intern_topic(topic_name), partition_id, ep);
} else {
if (trace) {
SLog("Will set_partition_leader() to ", ep, "\n");
}
set_partition_leader(intern_topic(topic_name), partition_id, ep);
}
retry.emplace_back(req_part);
req_part->partitions_list_ll.detach_and_reset();
br_req_partctx_it = next;
continue;
} else if (err_flags == 0xfe) {
// the first bundle in the bundles chunk is a sparse bundle
// it encodes the first and last message seq.number in that bundle header
// so there is no base seq.number here
if (trace) {
SLog("SPARSE bundle\n");
}
log_base_seqnum = 0;
} else {
// base abs. seq.num of the first message in the first bundle of this chunk
if (unlikely(p + sizeof(uint64_t) > end)) {
if (trace) {
SLog("Unable to decode log_base_seqnum\n");
}
return false;
}
log_base_seqnum = decode_pod<uint64_t>(p);
if (trace) {
SLog("Not a sparse bundle, log_base_seqnum = ", log_base_seqnum, "\n");
}
}
TANK_EXPECT(br_req_partctx_it != &br_req->partitions_list);
auto req_part = switch_list_entry(request_partition_ctx,
partitions_list_ll, br_req_partctx_it);
if (prefer_local_node) {
// PARTITION_PROVIDER
set_partition_provider(req_part->topic, req_part->partition, br_req->br->ep);
} else {
// OK, we know that this broker is the current leader for this partition
// see comment sin TankClient::consume() about an optimization
set_partition_leader(req_part->topic, req_part->partition, br_req->br->ep);
}
if (unlikely(p + sizeof(uint64_t) + sizeof(uint32_t) > end)) {
if (trace) {
SLog("Unable to decode hw mark and bundles_chunk_len\n");
}
return false;
}
const auto highwater_mark = decode_pod<uint64_t>(p);
const auto bundles_chunk_len = decode_pod<uint32_t>(p); // length of this particion's chunk that contains 0+ bundles
const auto requested_seqnum = req_part->as_op.consume.seq_num;
if (trace) {
SLog("err_flags = ", err_flags,
", log_base_seqnum = ", log_base_seqnum,
", highwater_mark = ", highwater_mark,
", bundles_chunk_len = ", bundles_chunk_len, " (", size_repr(bundles_chunk_len), ")",
", requested_seqnum = ", requested_seqnum, "\n");
}
if (err_flags == 0x1) {
// boundary check fault
auto next = br_req_partctx_it->next;
if (unlikely(p + sizeof(uint64_t) > end)) {
if (trace) {
SLog("Unable to decode first_avail_seqnum\n");
}
return false;
}
const auto first_avail_seqnum = decode_pod<uint64_t>(p);
if (trace_faults) {
SLog("Boundary check failed first_avail_seqnum = ", first_avail_seqnum, ", highwater_mark = ", highwater_mark, ", requested_seqnum = ", requested_seqnum, "\n");
}
capture_boundary_access_fault(api_req, topic_name, partition_id, first_avail_seqnum, highwater_mark);
any_faults = true;
TANK_EXPECT(br_req_partctx_it != &br_req->partitions_list);
auto req_part = switch_list_entry(request_partition_ctx, partitions_list_ll, br_req_partctx_it);
discard_request_partition_ctx(api_req, req_part);
br_req_partctx_it = next;
continue;
} else if (err_flags && err_flags < 0xfe) {
// TODO: captured_faults, get rid of partition
IMPLEMENT_ME();
continue;
}
// this is the reliable to detect if we have drained a partition
// as opposed to e.g checking if no messages were captured and next.minFetchSize <= used_min_fetch_size
const auto drained_partition = (bundles_chunk_len == 0);
const auto partition_bundles = bundles_chunk;
uint64_t first_msg_seqnum, last_msg_seqnum;
bundles_chunk += bundles_chunk_len; // skip bundles for this partition
// we need to know how much data we are going to need for
// the next request. Initially assume we 'll need the whole bundle, but
// we are going to adjust need_upto if we can depending on wether the message set of the bundle is compressed, etc
// This makes a lot more sense than the previous design
const uint8_t *need_from, *need_upto;
bool any_captured{false};
// consider all bundles in this chunk
// last bundle in this chunk may be partial
msgs_bucket *first_bucket{nullptr}, *last_bucket{nullptr};
size_t consumed = 0;
uint32_t last_bucket_size = sizeof_array(msgs_bucket::data);
if (trace) {
SLog("partition bundles_chunk_len = ", bundles_chunk_len, " (", size_repr(bundles_chunk_len), "), drained_partition = ", drained_partition, "\n");
}
used_buffers.clear();
// process all bundles in this partition's bundles chunk
for (const auto *p = partition_bundles, *const chunk_end = std::min(end, bundles_chunk);;) {
need_from = p; // it's important to track need_from from the beginning of the bundl
need_upto = need_from + 1024; // 256 was a bit too low
if (unlikely(not Compression::check_decode_varuint32(p, chunk_end))) {
if (trace) {
SLog("Unable to decode bundle_len:", std::distance(need_from, chunk_end),
" to end of the chunk. Consumed so far ", size_repr(std::distance(content, need_from)), "\n");
}
break;
}
const auto bundle_len = Compression::decode_varuint32(p);
const auto bundle_end = p + bundle_len;
// assume we will need until the end of the bundle at least
// we will adjust this as we understand better the response structure
if (trace) {
SLog("bundle_len = ", bundle_len, "(", size_repr(bundle_len), ") at ", std::distance(content, p), "\n");
}
if (unlikely(p >= chunk_end)) {
if (trace) {
SLog("Unable to decode bundle_hdr_flags\n");
}
break;
}
// BEGIN: bundle header
const auto bundle_hdr_flags = decode_pod<uint8_t>(p);
const auto codec = bundle_hdr_flags & 3;
const auto sparse_bundle = bundle_hdr_flags & (1u << 6);
uint32_t msgset_size = (bundle_hdr_flags >> 2) & 0xf;
uint64_t msgset_end;
range_base<const uint8_t *, size_t> msgset_content;
if (0 == msgset_size) {
// message set(in messages) > 15
// so encoded separately as a varu32
if (unlikely(!Compression::check_decode_varuint32(p, chunk_end))) {
break;
} else {
msgset_size = Compression::decode_varuint32(p);
}
}
if (trace) {
SLog("codec = ", codec, ", msgset_size = ", msgset_size, ", sparse_bundle = ", sparse_bundle, "\n");
}
if (sparse_bundle) {
if (unlikely(p + sizeof(uint64_t) >= chunk_end)) {
break;
}
first_msg_seqnum = decode_pod<uint64_t>(p);
if (msgset_size != 1) {
if (unlikely(!Compression::check_decode_varuint32(p, chunk_end))) {
break;
}
last_msg_seqnum = first_msg_seqnum + Compression::decode_varuint32(p) + 1;
} else {
last_msg_seqnum = first_msg_seqnum;
}
log_base_seqnum = first_msg_seqnum;
msgset_end = last_msg_seqnum + 1;
if (trace) {
SLog("sparse: first_msg_seqnum = ", first_msg_seqnum, ", msgset_end = ", msgset_end, "\n");
}
} else {
msgset_end = log_base_seqnum + msgset_size;
}
// END: bundle header
if (requested_seqnum < std::numeric_limits<uint64_t>::max() && requested_seqnum >= msgset_end) {
// fast path: skip this bundle
p = bundle_end;
log_base_seqnum = msgset_end;
if (trace) {
SLog("Skipping content bundle because requested_seqnum(", requested_seqnum, ") >= msgset_end(", msgset_end, ")\n");
}
continue;
}
if (codec) {
if (trace) {
SLog("Need to decompress for ", codec, ", ", std::distance(p, bundle_end), " bytes\n");
}
if (bundle_end > chunk_end) {
if (trace) {
SLog("Not enough bytes: ", std::distance(chunk_end, bundle_end),
" (", size_repr(std::distance(chunk_end, bundle_end)), ") more required\n");
}
need_upto = bundle_end; // yeah, we 'll need all the way to the end of the bundle
break;
}
auto b = get_buffer();
used_buffers.emplace_back(b);
switch (codec) {
case 1:
if (!Compression::UnCompress(Compression::Algo::SNAPPY, p, std::distance(p, bundle_end), b)) {
Print("Failed to decompress ", std::distance(p, bundle_end), " bytes for ", topic_name, "/", partition_id, ", at ", requested_seqnum, "\n");
IMPLEMENT_ME();
}
break;
default:
IMPLEMENT_ME();
}
msgset_content.set(reinterpret_cast<const uint8_t *>(b->data()), b->size());
} else {
retain_buffer = true;
msgset_content.set(p, std::min(std::distance(p, chunk_end), std::distance(p, bundle_end)));
}
// advance p past this bundle
p = bundle_end;
// scan this bundle's message set
uint64_t ts = 0;
uint32_t msg_idx = 0;
const auto min_accepted_seqnum = requested_seqnum == std::numeric_limits<uint64_t>::max() ? 0 : requested_seqnum;
if (trace) {
SLog("Scanning message set of length ", size_repr(msgset_content.size()), "\n");
}
for (const auto *p = msgset_content.offset, *const msgset_end = p + msgset_content.size();; ++msg_idx, ++log_base_seqnum) {
if (!codec && any_captured) {
// this makes sense because we didn't need to decompress the bundle
need_upto = p + 256;
}
if (unlikely(p + sizeof(uint8_t) > msgset_end)) {
// likely hit end of the message set
// i.e (p == msgset_end)
if (trace && p != msgset_end) {
SLog("Unable to decode msg_flags\n");
}
// important; don't goto next_partition here
// (technically, you should jmp next_partition if (p == msgset_end)
// and break otherwise
break;
}
const auto msg_flags = decode_pod<uint8_t>(p);
if (sparse_bundle) {
if (trace) {
SLog("Yes, sparse_bundle\n");
}
if (msg_flags & unsigned(TankFlags::BundleMsgFlags::SeqNumPrevPlusOne)) {
// (prev + 1)
// no need to advance log_base_seqnum, was advanced in for()
if (trace) {
SLog("seqnum++\n");
}
} else if (0 == msg_idx) {
log_base_seqnum = first_msg_seqnum;
if (trace) {
SLog("First, setting to ", first_msg_seqnum, "\n");
}
} else if (msg_idx == msgset_size - 1) {
log_base_seqnum = last_msg_seqnum;
if (trace) {
SLog("Last, setting to ", last_msg_seqnum, "\n");
}
} else {
if (unlikely(!Compression::check_decode_varuint32(p, msgset_end))) {
if (trace) {
SLog("Can't decode delta\n");
}
goto next_partition;
}
const auto delta = Compression::decode_varuint32(p);
if (trace) {
SLog("Advance by ", delta + 1, "\n");
}
// not going to set log_base_seqnum to (delta + 1)
// because we 'll (++log_base_seqnum) at the end of the iteration anyway
log_base_seqnum += delta;
}
}
const auto msg_abs_seqnum = log_base_seqnum;
if (trace) {
SLog("abs.seqnum = ", msg_abs_seqnum, "\n");
}
if (0 == (msg_flags & unsigned(TankFlags::BundleMsgFlags::UseLastSpecifiedTS))) {
if (unlikely(p + sizeof(uint64_t) > msgset_end)) {
if (trace) {
SLog("Not Enough Content Available\n");
}
goto next_partition;
}
ts = decode_pod<uint64_t>(p);
if (trace) {
SLog("New TS", ts, "\n");
}
} else if (trace) {
SLog("Using last TS\n");
}
if (msg_flags & unsigned(TankFlags::BundleMsgFlags::HaveKey)) {
if (unlikely(p + sizeof(uint8_t) > msgset_end || (p + *p + sizeof(uint8_t) > msgset_end))) {
if (trace) {
SLog("Not Enough Content Available\n");
}
goto next_partition;
} else {
key.set(reinterpret_cast<const char *>(p) + 1, *p);
p += sizeof(uint8_t) + key.size();
}
} else {
key.reset();
}
if (unlikely(!Compression::check_decode_varuint32(p, msgset_end))) {
if (trace) {
SLog("Not Enough Content Available\n");
}
goto next_partition;
}
const auto len = Compression::decode_varuint32(p);
if (trace) {
SLog("Message content length:", len, "\n");
}
if (const auto e = p + len; e > msgset_end) {
if (!codec && any_captured) {
// see above
need_upto = e + 256;
}
if (trace) {
SLog("Past msgset_end, needed additional:", e - msgset_end, " bytes to accept content\n");
}
goto next_partition;
}
if (msg_abs_seqnum > highwater_mark) {
// abort immediately
// we need to respect the semantics
if (trace) {
SLog("Past HW mark (msg_abs_seqnum = ", msg_abs_seqnum, ", highwater_mark = ", highwater_mark, ")\n");
}
goto next_partition;
} else if (msg_abs_seqnum >= min_accepted_seqnum) {
const str_view32 content(reinterpret_cast<const char *>(p), len);
if (last_bucket_size == sizeof_array(msgs_bucket::data)) {
auto b = get_msgs_bucket();
b->next = nullptr;
if (last_bucket) {
consumed += last_bucket_size;
last_bucket->next = b;
} else {
first_bucket = b;
}
last_bucket = b;
last_bucket_size = 0;
}
auto m = last_bucket->data + last_bucket_size++;
any_captured = true;
m->seqNum = msg_abs_seqnum;
m->content = content;
m->ts = ts;
m->key = key;
if (trace && false) {
SLog("Got key = [", key, "] ts = ",
Date::ts_repr(ts / 1000),
", content.len = ", size_repr(content.size()), "\n");
}
}
p += len; // to next bundle for this partition
}
}
next_partition:
TANK_EXPECT(need_from <= need_upto);
if (last_bucket) {
consumed += last_bucket_size;
}
auto next = br_req_partctx_it->next;
const auto next_min_span = std::distance(need_from, need_upto); // TODO: + 256
const auto next_seqnum = consumed
? requested_seqnum == std::numeric_limits<uint64_t>::max()
? last_bucket->data[last_bucket_size - 1].seqNum + 1
: std::max(requested_seqnum, last_bucket->data[last_bucket_size - 1].seqNum + 1)
: requested_seqnum == std::numeric_limits<uint64_t>::max() ? highwater_mark + 1
: requested_seqnum;
auto &req_part_resp = req_part->as_op.consume.response;
if (trace) {
SLog(ansifmt::bgcolor_red, "consumed = ", consumed,
", used_buffers = ", used_buffers.size(),
", next_min_span = ", next_min_span, " (", size_repr(next_min_span), " )",
", next_seqnum = ", next_seqnum, ansifmt::reset, "\n");
}
TANK_EXPECT(next_min_span < 256 * 1024 * 1024); // sanity check
req_part->as_op.response_valid = true;
req_part_resp.next.seq_num = next_seqnum;
req_part_resp.next.min_size = next_min_span;
req_part_resp.msgs.cnt = consumed;
// TODO: