Skip to content

Commit

Permalink
Add back in write to another topic and read from multiple
Browse files Browse the repository at this point in the history
  • Loading branch information
rycowhi committed Mar 11, 2024
1 parent 8ad12b1 commit e601a25
Showing 1 changed file with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ class KafkaSinkSpec
EmbeddedKafka.stop()
}

it should "support Kafka as a write source" in {
it should "support Kafka as a write source while reading from multiple Kafka read sources" in {
val topicName = "bananas"
val otherTopicName = "anotherTopic"

withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
Expand All @@ -74,9 +75,19 @@ class KafkaSinkSpec
.option("topic", topicName)
.save())

// Write to another topic seeding lineage for a downstream read
(_, _) <- captor.lineageOf(
testData
.selectExpr("CAST (name AS STRING) AS value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUrl)
.option("topic", otherTopicName)
.save())

(plan2, _) <- captor.lineageOf(
reader
.option("subscribe", s"$topicName")
.option("subscribe", s"$topicName,anotherTopic")
.load()
.write.mode(Overwrite).save(TempFile(pathOnly = true).deleteOnExit().path.toString))

Expand All @@ -98,6 +109,7 @@ class KafkaSinkSpec

plan2.operations.reads.head.extra("sourceType") shouldBe Some("kafka")
plan2.operations.reads.head.inputSources should contain(s"kafka:$topicName")
plan2.operations.reads.head.inputSources should contain(s"kafka:$otherTopicName")
plan2.operations.reads.head.params should contain key "subscribe"

plan3.operations.reads.head.extra("sourceType") shouldBe Some("kafka")
Expand Down

0 comments on commit e601a25

Please sign in to comment.