Skip to content

Commit

Permalink
Merge pull request #24447 from bashtanov/migrations-check-license
Browse files Browse the repository at this point in the history
Migrations check license
  • Loading branch information
bashtanov authored Dec 6, 2024
2 parents c4d57ee + 89b4908 commit e2c9c1f
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 44 deletions.
66 changes: 31 additions & 35 deletions src/v/cluster/data_migration_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ frontend::frontend(
: _self(self)
, _cloud_storage_api_initialized(cloud_storage_api_initialized)
, _table(table)
, _features(features)
, _features(features.local())
, _controller(stm)
, _leaders_table(leaders)
, _connections(connections)
, _leaders_table(leaders.local())
, _connections(connections.local())
, _topic_mount_handler(topic_mount_handler)
, _as(as)
, _operation_timeout(10s) {}
Expand All @@ -69,8 +69,7 @@ frontend::process_or_dispatch(
DispatchFunc dispatch,
ProcessFunc process,
ReplyMapperFunc reply_mapper) {
auto controller_leader = _leaders_table.local().get_leader(
model::controller_ntp);
auto controller_leader = _leaders_table.get_leader(model::controller_ntp);
/// Return early if there is no controller leader
if (!controller_leader) {
vlog(
Expand Down Expand Up @@ -102,18 +101,17 @@ frontend::process_or_dispatch(
req,
*controller_leader);
/// If leader is somewhere else, dispatch RPC request to current leader
auto reply = co_await _connections.local()
.with_node_client<data_migrations_client_protocol>(
_self,
ss::this_shard_id(),
*controller_leader,
_operation_timeout,
[req = std::move(req),
dispatch = std::forward<DispatchFunc>(dispatch)](
data_migrations_client_protocol client) mutable {
return dispatch(std::move(req), client)
.then(&rpc::get_ctx_data<Reply>);
});
auto reply
= co_await _connections.with_node_client<data_migrations_client_protocol>(
_self,
ss::this_shard_id(),
*controller_leader,
_operation_timeout,
[req = std::move(req), dispatch = std::forward<DispatchFunc>(dispatch)](
data_migrations_client_protocol client) mutable {
return dispatch(std::move(req), client)
.then(&rpc::get_ctx_data<Reply>);
});
vlog(
dm_log.debug,
"got reply {} from controller leader at {}",
Expand All @@ -122,14 +120,15 @@ frontend::process_or_dispatch(
co_return reply_mapper(std::move(reply));
}

bool frontend::data_migrations_active() const {
return _features.local().is_active(features::feature::data_migrations)
&& _cloud_storage_api_initialized;
bool frontend::data_migrations_active(bool check_license) const {
return _features.is_active(features::feature::data_migrations)
&& _cloud_storage_api_initialized
&& !(check_license && _features.should_sanction());
}

ss::future<result<id>> frontend::create_migration(
data_migration migration, can_dispatch_to_leader can_dispatch) {
if (!data_migrations_active()) {
if (!data_migrations_active(true)) {
return ssx::now<result<id>>(errc::feature_disabled);
}
vlog(dm_log.debug, "creating migration: {}", migration);
Expand Down Expand Up @@ -164,7 +163,7 @@ ss::future<result<id>> frontend::create_migration(

ss::future<std::error_code> frontend::update_migration_state(
id id, state state, can_dispatch_to_leader can_dispatch) {
if (!data_migrations_active()) {
if (!data_migrations_active(false)) {
return ssx::now<std::error_code>(errc::feature_disabled);
}
vlog(dm_log.debug, "updating migration: {} state with: {}", id, state);
Expand Down Expand Up @@ -198,7 +197,7 @@ ss::future<std::error_code> frontend::update_migration_state(

ss::future<std::error_code>
frontend::remove_migration(id id, can_dispatch_to_leader can_dispatch) {
if (!data_migrations_active()) {
if (!data_migrations_active(false)) {
return ssx::now<std::error_code>(errc::feature_disabled);
}
vlog(dm_log.debug, "removing migration: {}", id);
Expand Down Expand Up @@ -233,7 +232,7 @@ ss::future<check_ntp_states_reply> frontend::check_ntp_states_on_foreign_node(
model::node_id node, check_ntp_states_request&& req) {
vlog(dm_log.debug, "dispatching node request {} to node {}", req, node);

return _connections.local()
return _connections
.with_node_client<data_migrations_client_protocol>(
_self,
ss::this_shard_id(),
Expand Down Expand Up @@ -295,21 +294,17 @@ ss::future<result<id>> frontend::do_create_migration(data_migration migration) {
}

ss::future<chunked_vector<migration_metadata>> frontend::list_migrations() {
return container().invoke_on(data_migrations_shard, [](frontend& local) {
return local._table.local().list_migrations();
});
return _table.invoke_on_instance(&migrations_table::list_migrations);
}

ss::future<result<migration_metadata>>
frontend::get_migration(id migration_id) {
return container().invoke_on(
data_migrations_shard, [migration_id](frontend& local) {
auto maybe_migration = local._table.local().get_migration(
migration_id);
return maybe_migration
? result<migration_metadata>(maybe_migration->get().copy())
: errc::data_migration_not_exists;
});
return _table.invoke_on_instance([migration_id](migrations_table& table) {
auto maybe_migration = table.get_migration(migration_id);
return maybe_migration
? result<migration_metadata>(maybe_migration->get().copy())
: errc::data_migration_not_exists;
});
}

ss::future<frontend::list_mountable_topics_result>
Expand All @@ -333,6 +328,7 @@ ss::future<std::error_code> frontend::insert_barrier() {
* required for correctness but allows the fronted to do more accurate
* preliminary validation.
*/
static_assert(controller_stm_shard == data_migrations_shard);
auto barrier_result
= co_await _controller.local().insert_linearizable_barrier(
_operation_timeout + model::timeout_clock::now());
Expand Down
8 changes: 4 additions & 4 deletions src/v/cluster/data_migration_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,16 @@ class frontend : public ss::peering_sharded_service<frontend> {
"This method can only be called on data migration shard");
}

bool data_migrations_active() const;
bool data_migrations_active(bool check_license) const;

private:
model::node_id _self;
bool _cloud_storage_api_initialized;
ssx::single_sharded<migrations_table>& _table;
ss::sharded<features::feature_table>& _features;
features::feature_table& _features;
ss::sharded<controller_stm>& _controller;
ss::sharded<partition_leaders_table>& _leaders_table;
ss::sharded<rpc::connection_cache>& _connections;
partition_leaders_table& _leaders_table;
rpc::connection_cache& _connections;
std::optional<std::reference_wrapper<cloud_storage::topic_mount_handler>>
_topic_mount_handler;
ss::sharded<ss::abort_source>& _as;
Expand Down
61 changes: 56 additions & 5 deletions tests/rptest/tests/data_migrations_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,29 @@ def test_mount_inexistent(self):
self.admin.delete_data_migration(in_migration_id)
self.wait_migration_disappear(in_migration_id)

def toggle_license(self, on: bool):
ENV_KEY = '__REDPANDA_DISABLE_BUILTIN_TRIAL_LICENSE'
if on:
self.redpanda.unset_environment([ENV_KEY])
else:
self.redpanda.set_environment({ENV_KEY: '1'})
self.redpanda.rolling_restart_nodes(self.redpanda.nodes,
use_maintenance_mode=False)

@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
def test_creating_and_listing_migrations(self):
self.do_test_creating_and_listing_migrations(False)

@cluster(
num_nodes=3,
log_allow_list=MIGRATION_LOG_ALLOW_LIST + [
# license violation
r'/v1/migrations.*Requested feature is disabled',
])
def test_creating_and_listing_migrations_wo_license(self):
self.do_test_creating_and_listing_migrations(True)

def do_test_creating_and_listing_migrations(self, try_wo_license: bool):
topics = [TopicSpec(partition_count=3) for i in range(5)]

for t in topics:
Expand All @@ -591,20 +612,35 @@ def test_creating_and_listing_migrations(self):
migrations_map = self.get_migrations_map()
assert len(migrations_map) == 0, "There should be no data migrations"

with self.finj_thread():
if try_wo_license:
time.sleep(2) # make sure test harness can see Redpanda is live
self.toggle_license(on=False)
self.assure_not_migratable(
topics[0], {
"message":
"Unexpected cluster error: Requested feature is disabled",
"code": 500
})
self.toggle_license(on=True)

with nullcontext() if try_wo_license else self.finj_thread():
# out
outbound_topics = [make_namespaced_topic(t.name) for t in topics]
out_migration = OutboundDataMigration(outbound_topics,
consumer_groups=[])

out_migration_id = self.create_and_wait(out_migration)

if try_wo_license:
self.toggle_license(on=False)

self.check_migrations(out_migration_id, len(topics), 1)

self.execute_data_migration_action_flaky(out_migration_id,
MigrationAction.prepare)
self.wait_for_migration_states(out_migration_id,
['preparing', 'prepared'])
self.wait_for_migration_states(out_migration_id, ['prepared'])

self.execute_data_migration_action_flaky(out_migration_id,
MigrationAction.execute)
self.wait_for_migration_states(out_migration_id,
Expand All @@ -631,17 +667,32 @@ def test_creating_and_listing_migrations(self):
]
in_migration = InboundDataMigration(topics=inbound_topics,
consumer_groups=["g-1", "g-2"])
self.logger.info(f'{try_wo_license=}')
if try_wo_license:
self.toggle_license(on=True)
in_migration_id = self.create_and_wait(in_migration)
if try_wo_license:
self.toggle_license(on=False)
self.check_migrations(in_migration_id, len(inbound_topics), 2)

self.log_topics(t.source_topic_reference.topic
for t in inbound_topics)

self.execute_data_migration_action_flaky(in_migration_id,
MigrationAction.prepare)
self.wait_for_migration_states(in_migration_id,
['preparing', 'prepared'])
self.wait_for_migration_states(in_migration_id, ['prepared'])
if try_wo_license:
self.wait_for_migration_states(in_migration_id, ['preparing'])
time.sleep(5)
# stuck as a topic cannot be created without license
self.wait_for_migration_states(in_migration_id, ['preparing'])
self.toggle_license(on=True)
self.wait_for_migration_states(in_migration_id, ['prepared'])
self.toggle_license(on=False)
else:
self.wait_for_migration_states(in_migration_id,
['preparing', 'prepared'])
self.wait_for_migration_states(in_migration_id, ['prepared'])

self.execute_data_migration_action_flaky(in_migration_id,
MigrationAction.execute)
self.wait_for_migration_states(in_migration_id,
Expand Down

0 comments on commit e2c9c1f

Please sign in to comment.