Write your RDD
s and DStream
s to Kafka seamlessly
spark-kafka-writer is available on maven central with the following coordinates depending on whether you're using Kafka 0.8 or 0.10 and your version of Spark:
Kafka 0.8 | Kafka 0.10 | |
---|---|---|
Spark 1.6.X | "com.github.benfradet" %% "spark-kafka-writer" % "0.1.0" |
❌ |
Spark 2.0.X | "com.github.benfradet" %% "spark-kafka-0-8-writer" % "0.2.0" |
"com.github.benfradet" %% "spark-kafka-0-10-writer" % "0.2.0" |
- if you want to save an
RDD
to Kafka
import java.util.Properties
// replace by kafka08 if you're using Kafka 0.8
import com.github.benfradet.spark.kafka010.writer._
import org.apache.kafka.common.serialization.StringSerializer
val topic = "my-topic"
val producerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "127.0.0.1:9092")
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
val rdd: RDD[String] = ...
rdd.writeToKafka(
producerConfig,
s => new ProducerRecord[String, String](topic, s)
)
- if you want to save a
DStream
to Kafka
import java.util.Properties
// replace by kafka08 if you're using Kafka 0.8
import com.github.benfradet.spark.kafka010.writer._
import org.apache.kafka.common.serialization.StringSerializer
val topic = "my-topic"
val producerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "127.0.0.1:9092")
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
val dStream: DStream[String] = ...
dStream.writeToKafka(
producerConfig,
s => new ProducerRecord[String, String](topic, s)
)
You can find the full scaladoc at https://benfradet.github.io/spark-kafka-writer.
The original code was written by Hari Shreedharan.