forked from apache/doris
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy paths3_rate_limiter.cpp
155 lines (133 loc) · 4.96 KB
/
s3_rate_limiter.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "s3_rate_limiter.h"
#include <glog/logging.h> // IWYU pragma: export
#include <chrono>
#include <mutex>
#include <thread>
#if defined(__APPLE__)
#include <ctime>
#endif
#define CURRENT_TIME std::chrono::system_clock::now()
namespace doris {
// Just 10^6.
static constexpr auto NS = 1000000000UL;
class S3RateLimiter::SimpleSpinLock {
public:
SimpleSpinLock() = default;
~SimpleSpinLock() = default;
void lock() {
int spin_count = 0;
static constexpr int MAX_SPIN_COUNT = 50;
while (_flag.test_and_set(std::memory_order_acq_rel)) {
spin_count++;
if (spin_count >= MAX_SPIN_COUNT) {
LOG(WARNING) << "Warning: Excessive spinning detected while acquiring lock. Spin "
"count: ";
spin_count = 0;
}
// Spin until we acquire the lock
}
}
void unlock() { _flag.clear(std::memory_order_release); }
private:
std::atomic_flag _flag = ATOMIC_FLAG_INIT;
};
S3RateLimiter::S3RateLimiter(size_t max_speed, size_t max_burst, size_t limit)
: _max_speed(max_speed),
_max_burst(max_burst),
_limit(limit),
_mutex(std::make_unique<S3RateLimiter::SimpleSpinLock>()),
_remain_tokens(max_burst) {}
S3RateLimiter::~S3RateLimiter() = default;
S3RateLimiterHolder::~S3RateLimiterHolder() = default;
std::pair<size_t, double> S3RateLimiter::_update_remain_token(long now, size_t amount) {
// Values obtained under lock to be checked after release
size_t count_value;
double tokens_value;
{
std::lock_guard<SimpleSpinLock> lock(*_mutex);
now = (now < _prev_ns_count) ? _prev_ns_count : now;
if (_max_speed) {
double delta_seconds =
_prev_ns_count ? static_cast<double>(now - _prev_ns_count) / NS : 0;
_remain_tokens = std::min<double>(_remain_tokens + _max_speed * delta_seconds - amount,
_max_burst);
}
_count += amount;
count_value = _count;
tokens_value = _remain_tokens;
_prev_ns_count = now;
}
return {count_value, tokens_value};
}
int64_t S3RateLimiter::add(size_t amount) {
// Values obtained under lock to be checked after release
auto time = CURRENT_TIME;
auto time_nano_count = time.time_since_epoch().count();
auto [count_value, tokens_value] = _update_remain_token(time_nano_count, amount);
if (_limit && count_value > _limit) {
// CK would throw exception
return -1;
}
// Wait unless there is positive amount of remain_tokens - throttling
int64_t sleep_time_ns = 0;
if (_max_speed && tokens_value < 0) {
sleep_time_ns = static_cast<int64_t>(-tokens_value / _max_speed * NS);
std::this_thread::sleep_for(std::chrono::nanoseconds(sleep_time_ns));
}
return sleep_time_ns;
}
S3RateLimiterHolder::S3RateLimiterHolder(S3RateLimitType type, size_t max_speed, size_t max_burst,
size_t limit, std::function<void(int64_t)> metric_func)
: rate_limiter(std::make_unique<S3RateLimiter>(max_speed, max_burst, limit)),
metric_func(std::move(metric_func)) {}
int64_t S3RateLimiterHolder::add(size_t amount) {
int64_t sleep;
{
std::shared_lock read {rate_limiter_rw_lock};
sleep = rate_limiter->add(amount);
}
metric_func(sleep);
return sleep;
}
int S3RateLimiterHolder::reset(size_t max_speed, size_t max_burst, size_t limit) {
{
std::unique_lock write {rate_limiter_rw_lock};
rate_limiter = std::make_unique<S3RateLimiter>(max_speed, max_burst, limit);
}
return 0;
}
std::string to_string(S3RateLimitType type) {
switch (type) {
case S3RateLimitType::GET:
return "get";
case S3RateLimitType::PUT:
return "put";
default:
return std::to_string(static_cast<size_t>(type));
}
}
S3RateLimitType string_to_s3_rate_limit_type(std::string_view value) {
if (value == "get") {
return S3RateLimitType::GET;
} else if (value == "put") {
return S3RateLimitType::PUT;
}
return S3RateLimitType::UNKNOWN;
}
} // namespace doris