Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Azure Event Hubs Emulator container to Azure module #9665

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
39 changes: 38 additions & 1 deletion docs/modules/azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ This module is INCUBATING. While it is ready for use and operational in the curr

Testcontainers module for the Microsoft Azure's [SDK](https://github.com/Azure/azure-sdk-for-java).

Currently, the module supports `Azurite` and `CosmosDB` emulators. In order to use them, you should use the following classes:
Currently, the module supports `Azurite`, `Azure Event Hubs` and `CosmosDB` emulators. In order to use them, you should use the following classes:

Class | Container Image
-|-
AzuriteContainer | [mcr.microsoft.com/azure-storage/azurite](https://github.com/microsoft/containerregistry)
AzureEventHubsEmulatorContainer | [mcr.microsoft.com/azure-messaging/eventhubs-emulator](https://github.com/microsoft/containerregistry)
CosmosDBEmulatorContainer | [mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator](https://github.com/microsoft/containerregistry)

## Usage example
Expand Down Expand Up @@ -72,6 +73,42 @@ Build Azure Table client:
!!! note
We can use custom credentials the same way as defined in the Blob section.

### Azure Event Hubs Emulator

<!--codeinclude-->
[Configuring the Azure Event Hubs Emulator container](../../modules/azure/src/test/resources/eventhubs_config.json)
<!--/codeinclude-->

Start Azure Event Hubs Emulator during a test:

<!--codeinclude-->
[Setting up a network](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventHubsEmulatorContainerTest.java) inside_block:network
<!--/codeinclude-->

<!--codeinclude-->
[Starting an Azurite container as dependency](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventHubsEmulatorContainerTest.java) inside_block:azuriteContainer
<!--/codeinclude-->

<!--codeinclude-->
[Starting an Azure Event Hubs Emulator container](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventHubsEmulatorContainerTest.java) inside_block:emulatorContainer
<!--/codeinclude-->

#### Using Azure Event Hubs clients

Configure the consumer and the producer clients:

<!--codeinclude-->
[Configuring the clients](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventHubsEmulatorContainerTest.java) inside_block:createProducerAndConsumer
<!--/codeinclude-->

#### Using Kafka clients

Configure the consumer and the producer clients:

<!--codeinclude-->
[Obtaining the Kafka connection properties](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventHubsEmulatorContainerTest.java) inside_block:kafkaProperties
<!--/codeinclude-->

### CosmosDB

Start Azure CosmosDB Emulator during a test:
Expand Down
2 changes: 2 additions & 0 deletions modules/azure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ dependencies {
testImplementation 'com.azure:azure-storage-blob:12.29.0'
testImplementation 'com.azure:azure-storage-queue:12.24.0'
testImplementation 'com.azure:azure-data-tables:12.5.0'
testImplementation 'com.azure:azure-messaging-eventhubs:5.19.2'
testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package org.testcontainers.azure;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.LicenseAcceptance;

/**
* Testcontainers implementation for Azure Eventhubs Emulator.
* <p>
* Supported image: {@code "mcr.microsoft.com/azure-messaging/eventhubs-emulator"}
* <p>
* Exposed ports:
* <ul>
* <li>AMQP: 5672</li>
* <li>Kafka: 9092</li>
* </ul>
*/
public class AzureEventHubsEmulatorContainer extends GenericContainer<AzureEventHubsEmulatorContainer> {

private static final int DEFAULT_AMQP_PORT = 5672;

private static final int DEFAULT_KAFKA_PORT = 9092;

private static final String CONNECTION_STRING_FORMAT =
"Endpoint=sb://%s:%d;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;";

private static final String BOOTSTRAP_SERVERS_FORMAT = "%s:%d";

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse(
"mcr.microsoft.com/azure-messaging/eventhubs-emulator"
);

private AzuriteContainer azuriteContainer;

private boolean useKafka;

/**
* @param dockerImageName specified docker image name to run
*/
public AzureEventHubsEmulatorContainer(final String dockerImageName) {
this(DockerImageName.parse(dockerImageName));
}

/**
* @param dockerImageName specified docker image name to run
*/
public AzureEventHubsEmulatorContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
waitingFor(Wait.forLogMessage(".*Emulator Service is Successfully Up!.*", 1));
withExposedPorts(DEFAULT_AMQP_PORT);
}

/**
* * Sets the Azurite dependency needed by the Event Hubs Container,
*
* @param azuriteContainer The Azurite container used by Event HUbs as a dependency
* @return this
*/
public AzureEventHubsEmulatorContainer withAzuriteContainer(final AzuriteContainer azuriteContainer) {
this.azuriteContainer = azuriteContainer;
dependsOn(this.azuriteContainer);
return this;
}

/**
* Provide the broker configuration to the container.
*
* @param config The file containing the broker configuration
* @return this
*/
public AzureEventHubsEmulatorContainer withConfig(final Transferable config) {
nagyesta marked this conversation as resolved.
Show resolved Hide resolved
withCopyToContainer(config, "/Eventhubs_Emulator/ConfigFiles/Config.json");
return this;
}

/**
* Accepts the EULA of the container.
*
* @return this
*/
public AzureEventHubsEmulatorContainer acceptLicense() {
return withEnv("ACCEPT_EULA", "Y");
}

/**
* Enables Kafka support.
*
* @return this
*/
public AzureEventHubsEmulatorContainer enableKafka() {
this.useKafka = true;
return this;
}

@Override
protected void configure() {
if (azuriteContainer == null) {
throw new IllegalStateException(
"The image " +
getDockerImageName() +
" requires an Azurite container. Please provide one with the withAzuriteContainer method!"
);
}
final String azuriteHost = azuriteContainer.getNetworkAliases().get(0);
withEnv("BLOB_SERVER", azuriteHost);
withEnv("METADATA_SERVER", azuriteHost);
// If license was not accepted programmatically, check if it was accepted via resource file
if (!getEnvMap().containsKey("ACCEPT_EULA")) {
LicenseAcceptance.assertLicenseAccepted(this.getDockerImageName());
acceptLicense();
}
if (this.useKafka) {
//Kafka must expose with the fixed default port or the broker's advertised port won't match
this.addFixedExposedPort(DEFAULT_KAFKA_PORT, DEFAULT_KAFKA_PORT);
}
}

/**
* Returns the connection string.
*
* @return connection string
*/
public String getConnectionString() {
return String.format(CONNECTION_STRING_FORMAT, getHost(), getMappedPort(DEFAULT_AMQP_PORT));
}

/**
* Returns the kafka bootstrap servers
*
* @return bootstrap servers
*/
public String getBootstrapServers() {
return String.format(BOOTSTRAP_SERVERS_FORMAT, getHost(), getMappedPort(DEFAULT_KAFKA_PORT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package org.testcontainers.azure;

import com.azure.core.util.IterableStream;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Rule;
import org.junit.Test;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.MountableFile;

import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.waitAtMost;

public class AzureEventHubsEmulatorContainerTest {

@Rule
// network {
public Network network = Network.newNetwork();
// }

@Rule
// azuriteContainer {
public AzuriteContainer azuriteContainer = new AzuriteContainer("mcr.microsoft.com/azure-storage/azurite:3.33.0")
.withNetwork(network);
// }

@Rule
// emulatorContainer {
public AzureEventHubsEmulatorContainer emulator = new AzureEventHubsEmulatorContainer(
"mcr.microsoft.com/azure-messaging/eventhubs-emulator:2.0.1"
)
.acceptLicense()
.enableKafka() //optional
.withNetwork(network)
.withConfig(MountableFile.forClasspathResource("/eventhubs_config.json"))
.withAzuriteContainer(azuriteContainer);
// }

@Test
public void testWithEventHubsClient() {
try (
// createProducerAndConsumer {
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(emulator.getConnectionString())
.fullyQualifiedNamespace("emulatorNs1")
.eventHubName("eh1")
.buildProducerClient();
EventHubConsumerClient consumer = new EventHubClientBuilder()
.connectionString(emulator.getConnectionString())
.fullyQualifiedNamespace("emulatorNs1")
.eventHubName("eh1")
.consumerGroup("cg1")
.buildConsumerClient()
// }
) {
producer.send(Collections.singletonList(new EventData("test")));

waitAtMost(Duration.ofSeconds(30))
.pollDelay(Duration.ofSeconds(5))
.untilAsserted(() -> {
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(
"0",
1,
EventPosition.earliest(),
Duration.ofSeconds(2)
);
Optional<PartitionEvent> event = events.stream().findFirst();
assertThat(event).isPresent();
assertThat(event.get().getData().getBodyAsString()).isEqualTo("test");
});
}
}

@Test
public void testWithKafkaClient() throws Exception {
// kafkaProperties {
ImmutableMap<String, String> commonProperties = ImmutableMap
.<String, String>builder()
.put("bootstrap.servers", emulator.getBootstrapServers())
.put("sasl.mechanism", "PLAIN")
.put("security.protocol", "SASL_PLAINTEXT")
.put(
"sasl.jaas.config",
String.format(
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";",
emulator.getConnectionString()
)
)
.build();
// }

Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
producerProperties.putAll(commonProperties);

Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID());
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.putAll(commonProperties);

try (
KafkaProducer<String, String> producer = new KafkaProducer<>(
producerProperties,
new StringSerializer(),
new StringSerializer()
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
consumerProperties,
new StringDeserializer(),
new StringDeserializer()
);
) {
String topicName = "eh1";
consumer.subscribe(Collections.singletonList(topicName));

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
});

consumer.unsubscribe();
}
}
}
24 changes: 24 additions & 0 deletions modules/azure/src/test/resources/eventhubs_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"UserConfig": {
"NamespaceConfig": [
{
"Type": "EventHub",
"Name": "emulatorNs1",
"Entities": [
{
"Name": "eh1",
"PartitionCount": "1",
"ConsumerGroups": [
{
"Name": "cg1"
}
]
}
]
}
],
"LoggingConfig": {
"Type": "File"
}
}
}
Loading