Skip to content

Commit

Permalink
Unexport constructors with X prefix and also module-level exported …
Browse files Browse the repository at this point in the history
…functions (#140)

* Remove X-prefix from constructors
* Unexport module-level exported functions
  • Loading branch information
mostafa authored Jul 13, 2022
1 parent 26dbd65 commit 47cdfe5
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 40 deletions.
12 changes: 6 additions & 6 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ type ConsumeConfig struct {
ValueSchema string `json:"valueSchema"`
}

// XReader is a wrapper around kafkago.Reader and acts as a JS constructor
// readerClass is a wrapper around kafkago.reader and acts as a JS constructor
// for this extension, thus it must be called with new operator, e.g. new Reader(...).
// nolint: funlen
func (k *Kafka) XReader(call goja.ConstructorCall) *goja.Object {
func (k *Kafka) readerClass(call goja.ConstructorCall) *goja.Object {
runtime := k.vu.Runtime()
var readerConfig *ReaderConfig
if len(call.Arguments) == 0 {
Expand Down Expand Up @@ -236,8 +236,8 @@ func (k *Kafka) reader(readerConfig *ReaderConfig) *kafkago.Reader {
return reader
}

// GetDeserializer returns the deserializer for the given schema.
func (k *Kafka) GetDeserializer(schema string) Deserializer {
// getDeserializer returns the deserializer for the given schema.
func (k *Kafka) getDeserializer(schema string) Deserializer {
if de, ok := k.deserializerRegistry.Registry[schema]; ok {
return de.GetDeserializer()
}
Expand Down Expand Up @@ -271,8 +271,8 @@ func (k *Kafka) consume(
logger.WithField("error", err).Warn("Using default string serializers")
}

keyDeserializer := k.GetDeserializer(consumeConfig.Config.Consumer.KeyDeserializer)
valueDeserializer := k.GetDeserializer(consumeConfig.Config.Consumer.ValueDeserializer)
keyDeserializer := k.getDeserializer(consumeConfig.Config.Consumer.KeyDeserializer)
valueDeserializer := k.getDeserializer(consumeConfig.Config.Consumer.ValueDeserializer)

messages := make([]map[string]interface{}, 0)

Expand Down
4 changes: 2 additions & 2 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ func initializeConsumerTest(t *testing.T) (*kafkaTest, *kafkago.Writer) {
test := GetTestModuleInstance(t)

// Create a Kafka topic
connection := test.module.Kafka.GetKafkaControllerConnection(&ConnectionConfig{
connection := test.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
})
defer connection.Close()

test.module.Kafka.CreateTopic(connection, &kafkago.TopicConfig{
test.module.Kafka.createTopic(connection, &kafkago.TopicConfig{
Topic: "test-topic",
})

Expand Down
6 changes: 3 additions & 3 deletions module.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance {

// Export the functions from the Kafka module to the JS code.
// The Writer is a constructor and must be called with new, e.g. new Writer(...).
mustExport("Writer", moduleInstance.XWriter)
mustExport("Writer", moduleInstance.writerClass)
// The Reader is a constructor and must be called with new, e.g. new Reader(...).
mustExport("Reader", moduleInstance.XReader)
mustExport("Reader", moduleInstance.readerClass)
// The Connection is a constructor and must be called with new, e.g. new Connection(...).
mustExport("Connection", moduleInstance.XConnection)
mustExport("Connection", moduleInstance.connectionClass)

// This causes the struct fields to be exported to the native (camelCases) JS code.
virtualUser.Runtime().SetFieldNameMapper(goja.TagFieldNameMapper("json", true))
Expand Down
12 changes: 6 additions & 6 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ type ProduceConfig struct {
ValueSchema string `json:"valueSchema"`
}

// XWriter is a wrapper around kafkago.Writer and acts as a JS constructor
// writerClass is a wrapper around kafkago.writer and acts as a JS constructor
// for this extension, thus it must be called with new operator, e.g. new Writer(...).
// nolint: funlen
func (k *Kafka) XWriter(call goja.ConstructorCall) *goja.Object {
func (k *Kafka) writerClass(call goja.ConstructorCall) *goja.Object {
runtime := k.vu.Runtime()
var writerConfig *WriterConfig
if len(call.Arguments) == 0 {
Expand Down Expand Up @@ -209,8 +209,8 @@ func (k *Kafka) writer(writerConfig *WriterConfig) *kafkago.Writer {
return writer
}

// GetSerializer returns the serializer for the given schema.
func (k *Kafka) GetSerializer(schema string) Serializer {
// getSerializer returns the serializer for the given schema.
func (k *Kafka) getSerializer(schema string) Serializer {
if ser, ok := k.serializerRegistry.Registry[schema]; ok {
return ser.GetSerializer()
}
Expand Down Expand Up @@ -239,8 +239,8 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) {
logger.WithField("error", err).Warn("Using default string serializers")
}

keySerializer := k.GetSerializer(produceConfig.Config.Producer.KeySerializer)
valueSerializer := k.GetSerializer(produceConfig.Config.Producer.ValueSerializer)
keySerializer := k.getSerializer(produceConfig.Config.Producer.KeySerializer)
valueSerializer := k.getSerializer(produceConfig.Config.Producer.ValueSerializer)

kafkaMessages := make([]kafkago.Message, len(produceConfig.Messages))
for index, message := range produceConfig.Messages {
Expand Down
4 changes: 2 additions & 2 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ func TestProduceWithoutKey(t *testing.T) {

// Create a topic before producing messages, otherwise tests will fail.
assert.NotPanics(t, func() {
connection := test.module.GetKafkaControllerConnection(&ConnectionConfig{
connection := test.module.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
})
test.module.CreateTopic(connection, &kafkago.TopicConfig{
test.module.createTopic(connection, &kafkago.TopicConfig{
Topic: "test-topic",
NumPartitions: 1,
ReplicationFactor: 1,
Expand Down
28 changes: 14 additions & 14 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ type ConnectionConfig struct {
TLS TLSConfig `json:"tls"`
}

// XConnection is a constructor for the Connection object in JS
// connectionClass is a constructor for the Connection object in JS
// that creates a new connection for creating, listing and deleting topics,
// e.g. new Connection(...).
// nolint: funlen
func (k *Kafka) XConnection(call goja.ConstructorCall) *goja.Object {
func (k *Kafka) connectionClass(call goja.ConstructorCall) *goja.Object {
runtime := k.vu.Runtime()
var connectionConfig *ConnectionConfig
if len(call.Arguments) == 0 {
Expand All @@ -37,7 +37,7 @@ func (k *Kafka) XConnection(call goja.ConstructorCall) *goja.Object {
}
}

connection := k.GetKafkaControllerConnection(connectionConfig)
connection := k.getKafkaControllerConnection(connectionConfig)

connectionObject := runtime.NewObject()
// This is the connection object itself
Expand All @@ -61,7 +61,7 @@ func (k *Kafka) XConnection(call goja.ConstructorCall) *goja.Object {
}
}

k.CreateTopic(connection, topicConfig)
k.createTopic(connection, topicConfig)
return goja.Undefined()
})
if err != nil {
Expand All @@ -73,7 +73,7 @@ func (k *Kafka) XConnection(call goja.ConstructorCall) *goja.Object {
if topic, ok := call.Argument(0).Export().(string); !ok {
common.Throw(runtime, ErrNotEnoughArguments)
} else {
k.DeleteTopic(connection, topic)
k.deleteTopic(connection, topic)
}
}

Expand All @@ -84,7 +84,7 @@ func (k *Kafka) XConnection(call goja.ConstructorCall) *goja.Object {
}

err = connectionObject.Set("listTopics", func(call goja.FunctionCall) goja.Value {
topics := k.ListTopics(connection)
topics := k.listTopics(connection)
return runtime.ToValue(topics)
})
if err != nil {
Expand All @@ -105,10 +105,10 @@ func (k *Kafka) XConnection(call goja.ConstructorCall) *goja.Object {
return connectionObject
}

// GetKafkaControllerConnection returns a kafka controller connection with a given node address.
// getKafkaControllerConnection returns a kafka controller connection with a given node address.
// It will also try to use the auth and TLS settings to create a secure connection. The connection
// should be closed after use.
func (k *Kafka) GetKafkaControllerConnection(connectionConfig *ConnectionConfig) *kafkago.Conn {
func (k *Kafka) getKafkaControllerConnection(connectionConfig *ConnectionConfig) *kafkago.Conn {
dialer, wrappedError := GetDialer(connectionConfig.SASL, connectionConfig.TLS)
if wrappedError != nil {
logger.WithField("error", wrappedError).Error(wrappedError)
Expand Down Expand Up @@ -154,10 +154,10 @@ func (k *Kafka) GetKafkaControllerConnection(connectionConfig *ConnectionConfig)
return controllerConn
}

// CreateTopic creates a topic with the given name, partitions, replication factor and compression.
// createTopic creates a topic with the given name, partitions, replication factor and compression.
// It will also try to use the auth and TLS settings to create a secure connection. If the topic
// already exists, it will do no-op.
func (k *Kafka) CreateTopic(conn *kafkago.Conn, topicConfig *kafkago.TopicConfig) {
func (k *Kafka) createTopic(conn *kafkago.Conn, topicConfig *kafkago.TopicConfig) {
if topicConfig.NumPartitions <= 0 {
topicConfig.NumPartitions = 1
}
Expand All @@ -174,10 +174,10 @@ func (k *Kafka) CreateTopic(conn *kafkago.Conn, topicConfig *kafkago.TopicConfig
}
}

// DeleteTopic deletes the given topic from the given address. It will also try to
// deleteTopic deletes the given topic from the given address. It will also try to
// use the auth and TLS settings to create a secure connection. If the topic
// does not exist, it will raise an error.
func (k *Kafka) DeleteTopic(conn *kafkago.Conn, topic string) {
func (k *Kafka) deleteTopic(conn *kafkago.Conn, topic string) {
err := conn.DeleteTopics([]string{topic}...)
if err != nil {
wrappedError := NewXk6KafkaError(failedDeleteTopic, "Failed to delete topic.", err)
Expand All @@ -186,10 +186,10 @@ func (k *Kafka) DeleteTopic(conn *kafkago.Conn, topic string) {
}
}

// ListTopics lists the topics from the given address. It will also try to
// listTopics lists the topics from the given address. It will also try to
// use the auth and TLS settings to create a secure connection. If the topic
// does not exist, it will raise an error.
func (k *Kafka) ListTopics(conn *kafkago.Conn) []string {
func (k *Kafka) listTopics(conn *kafkago.Conn) []string {
partitions, err := conn.ReadPartitions()
if err != nil {
wrappedError := NewXk6KafkaError(failedReadPartitions, "Failed to read partitions.", err)
Expand Down
14 changes: 7 additions & 7 deletions topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestGetKafkaControllerConnection(t *testing.T) {
test := GetTestModuleInstance(t)
assert.NotPanics(t, func() {
connection := test.module.Kafka.GetKafkaControllerConnection(&ConnectionConfig{
connection := test.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
})
assert.NotNil(t, connection)
Expand All @@ -26,7 +26,7 @@ func TestGetKafkaControllerConnectionFails(t *testing.T) {
test := GetTestModuleInstance(t)

assert.Panics(t, func() {
connection := test.module.Kafka.GetKafkaControllerConnection(&ConnectionConfig{
connection := test.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9094",
})
assert.Nil(t, connection)
Expand All @@ -39,20 +39,20 @@ func TestTopics(t *testing.T) {

require.NoError(t, test.moveToVUCode())
assert.NotPanics(t, func() {
connection := test.module.Kafka.GetKafkaControllerConnection(&ConnectionConfig{
connection := test.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
})

test.module.Kafka.CreateTopic(connection, &kafkago.TopicConfig{
test.module.Kafka.createTopic(connection, &kafkago.TopicConfig{
Topic: "test-topic",
})

topics := test.module.Kafka.ListTopics(connection)
topics := test.module.Kafka.listTopics(connection)
assert.Contains(t, topics, "test-topic")

test.module.Kafka.DeleteTopic(connection, "test-topic")
test.module.Kafka.deleteTopic(connection, "test-topic")

topics = test.module.Kafka.ListTopics(connection)
topics = test.module.Kafka.listTopics(connection)
assert.NotContains(t, topics, "test-topic")

connection.Close()
Expand Down

0 comments on commit 47cdfe5

Please sign in to comment.