Skip to content

Commit

Permalink
JAVA-3125: Match broadcast RPC for control connection and Astra events
Browse files Browse the repository at this point in the history
patch by Lukasz Antoniak; reviewed by Bret McGuire and Andrew Tolbert for JAVA-3125
reference: #1981

Fix to address Scassandra test issues
  • Loading branch information
lukasz-antoniak authored and absurdfarce committed Jan 17, 2025
1 parent a221439 commit 1876645
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,11 @@ private static void updateInfo(
broadcastRpcAddress = new InetSocketAddress(nativeAddress, nativePort);
} else if (row.getColumnDefinitions().contains("rpc_address")) {
InetAddress rpcAddress = row.getInet("rpc_address");
broadcastRpcAddress = new InetSocketAddress(rpcAddress, cluster.connectionFactory.getPort());
int nativePort = cluster.connectionFactory.getPort();
if (row.getColumnDefinitions().contains("rpc_port")) {
nativePort = row.getInt("rpc_port");
}
broadcastRpcAddress = new InetSocketAddress(rpcAddress, nativePort);
}
// Before CASSANDRA-9436, system.local doesn't have rpc_address, so this might be null. It's not
// a big deal because we only use this for server events, and the control node doesn't receive
Expand Down Expand Up @@ -854,8 +858,11 @@ private void refreshNodeListAndTokenMap(
broadcastRpcAddress = new InetSocketAddress(nativeAddress, nativePort);
} else {
InetAddress rpcAddress = row.getInet("rpc_address");
broadcastRpcAddress =
new InetSocketAddress(rpcAddress, cluster.connectionFactory.getPort());
int nativePort = cluster.connectionFactory.getPort();
if (row.getColumnDefinitions().contains("rpc_port")) {
nativePort = row.getInt("rpc_port");
}
broadcastRpcAddress = new InetSocketAddress(rpcAddress, nativePort);
}
broadcastRpcAddresses.add(broadcastRpcAddress);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.datastax.driver.core.Assertions.assertThat;
import static com.datastax.driver.core.CreateCCM.TestMode.PER_METHOD;
import static com.datastax.driver.core.ScassandraCluster.SELECT_LOCAL;
import static com.datastax.driver.core.ScassandraCluster.SELECT_LOCAL_RPC_ADDRESS_AND_PORT;
import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS;
import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS_DSE68;
import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS_V2;
Expand Down Expand Up @@ -659,6 +660,50 @@ public void should_extract_hosts_using_rpc_address_from_peers() throws UnknownHo
runPeerTest(state);
}

@Test(groups = "short")
@CCMConfig(createCcm = false)
public void should_extract_hosts_port_using_rpc_port_from_local() throws UnknownHostException {
InetAddress expectedAddress = InetAddress.getByName("1.2.3.4");
int expectedPort = 29042;
PeerRowState state =
PeerRowState.builder()
.local("rpc_address", expectedAddress)
.local("rpc_port", expectedPort)
.build();

ScassandraCluster scassandras =
ScassandraCluster.builder().withNodes(2).withPeersV2(state.usePeersV2()).build();
scassandras.init();

Cluster cluster = null;
try {
scassandras.node(1).primingClient().clearAllPrimes();
PrimingClient primingClient = scassandras.node(1).primingClient();
primingClient.prime(
PrimingRequest.queryBuilder()
.withQuery("SELECT * FROM system.local WHERE key='local'")
.withThen(
then()
.withColumnTypes(SELECT_LOCAL_RPC_ADDRESS_AND_PORT)
.withRows(state.getLocalRow())
.build())
.build());
cluster =
Cluster.builder()
.addContactPoints(scassandras.address(1).getAddress())
.withPort(scassandras.getBinaryPort())
.withNettyOptions(nonQuietClusterCloseOptions)
.build();
cluster.connect();

assertThat(cluster.manager.getControlConnection().connectedHost().getBroadcastRpcAddress())
.isEqualTo(new InetSocketAddress(expectedAddress, expectedPort));
} finally {
if (cluster != null) cluster.close();
scassandras.stop();
}
}

private void runPeerTest(PeerRowState state) {

ScassandraCluster scassandras =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,14 +658,14 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu
column("cluster_name", TEXT),
column("cql_version", TEXT),
column("data_center", TEXT),
column("graph", BOOLEAN),
column("host_id", UUID),
column("listen_address", INET),
column("partitioner", TEXT),
column("rack", TEXT),
column("release_version", TEXT),
column("tokens", set(TEXT)),
column("graph", BOOLEAN),
column("host_id", UUID),
column("schema_version", UUID)
column("schema_version", UUID),
column("tokens", set(TEXT))
};

public static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_LOCAL_V2 = {
Expand All @@ -676,15 +676,36 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu
column("cluster_name", TEXT),
column("cql_version", TEXT),
column("data_center", TEXT),
column("graph", BOOLEAN),
column("host_id", UUID),
column("listen_address", INET),
column("listen_port", INT),
column("partitioner", TEXT),
column("rack", TEXT),
column("release_version", TEXT),
column("tokens", set(TEXT)),
column("graph", BOOLEAN),
column("schema_version", UUID),
column("tokens", set(TEXT))
};

public static final org.scassandra.http.client.types.ColumnMetadata[]
SELECT_LOCAL_RPC_ADDRESS_AND_PORT = {
column("key", TEXT),
column("bootstrapped", TEXT),
column("broadcast_address", INET),
column("broadcast_port", INT),
column("cluster_name", TEXT),
column("cql_version", TEXT),
column("data_center", TEXT),
column("host_id", UUID),
column("schema_version", UUID)
column("listen_address", INET),
column("listen_port", INT),
column("partitioner", TEXT),
column("rack", TEXT),
column("release_version", TEXT),
column("rpc_address", INET),
column("rpc_port", INT),
column("schema_version", UUID),
column("tokens", set(TEXT))
};

static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_CLUSTER_NAME = {
Expand Down

0 comments on commit 1876645

Please sign in to comment.