当前位置:   article > 正文

Flink1.14新版KafkaSource和KafkaSink实践使用(自定义反序列化器、Topic选择器、序列化器、分区器)

kafkasink

前言

在官方文档的描述中,API FlinkKafkaConsumer和FlinkKafkaProducer将在后续版本陆续弃用、移除,所以在未来生产中有版本升级的情况下,新API KafkaSource和KafkaSink还是有必要学会使用的。下面介绍下基于新API的一些自定义类以及主程序的简单实践。

官方案例

官方文档地址:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/datastream/kafka/

KafkaSource的自定义类

自定义反序列化器

自定义反序列化器可以以指定的格式取到来源Kafka消息中我们想要的元素。该类需要继承 KafkaDeserializationSchema ,这里简单将来源Kafka的topic、key、value以Tuple3[String, String, String]的格式取出来。

MyKafkaDeserializationSchemaTuple3.scala

  1. import org.apache.flink.api.common.typeinfo.TypeInformation
  2. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
  3. import org.apache.kafka.clients.consumer.ConsumerRecord
  4. import java.nio.charset.StandardCharsets
  5. /**
  6. * @author hushhhh
  7. */
  8. class MyKafkaDeserializationSchemaTuple3 extends KafkaDeserializationSchema[(String, String, String)] {
  9. override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, String) = {
  10. new Tuple3[String, String, String](
  11. record.topic(),
  12. new String(record.key(), StandardCharsets.UTF_8),
  13. new String(record.value(), StandardCharsets.UTF_8))
  14. }
  15. override def isEndOfStream(nextElement: (String, String, String)): Boolean = false
  16. override def getProducedType: TypeInformation[(String, String, String)] = {
  17. TypeInformation.of(classOf[(String, String, String)])
  18. }
  19. }

KafkaSink的自定义类

自定义Topic选择器

自定义一个 TopicSelector 可以将流中多个topic里的数据根据一定逻辑分发到不同的目标topic里。该类需要继承 TopicSelector ,这里简单根据来源Kafka的topic名拼接下。

MyTopicSelector.scala

  1. import org.apache.flink.connector.kafka.sink.TopicSelector
  2. /**
  3. * @author hushhhh
  4. */
  5. class MyTopicSelector extends TopicSelector[(String, String, String)] {
  6. override def apply(t: (String, String, String)): String = {
  7.     // t: 来源kafka的topic、key、value
  8. "TOPIC_" + t._1.toUpperCase()
  9. }
  10. }

自定义序列化器

自定义序列化器可以将数据根据自己的业务格式写到目标Kafka的key和value里,这里将来源Kafka里的key和value直接写出去,这两个类都需要继承 SerializationSchema 。

ProducerRecord Key的序列化器

MyKeySerializationSchema.scala

  1. import org.apache.flink.api.common.serialization.SerializationSchema
  2. /**
  3. * @author hushhhh
  4. */
  5. class MyKeySerializationSchema extends SerializationSchema[(String, String, String)] {
  6. override def serialize(element: (String, String, String)): Array[Byte] = {
  7.     // element: 来源kafka的topic、key、value
  8. element._2.getBytes()
  9. }
  10. }

ProducerRecord Value的序列化器

MyValueSerializationSchema.scala

  1. import org.apache.flink.api.common.serialization.SerializationSchema
  2. /**
  3. * @author hushhhh
  4. */
  5. class MyValueSerializationSchema extends SerializationSchema[(String, String, String)] {
  6. override def serialize(element: (String, String, String)): Array[Byte] = {
  7.     // element: 来源kafka的topic、key、value
  8. element._3.getBytes()
  9. }
  10. }

自定义分区器

自定义分区器可以根据具体逻辑对要写到目标Kafka 里的数据进行partition分配。该类需要继承 FlinkKafkaPartitioner ,这里根据key的hash分配到不同的partition里(如果目标topic有多个partition的话)。

MyPartitioner.scala

  1. import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner
  2. /**
  3. * @author hushhhh
  4. */
  5. class MyPartitioner extends FlinkKafkaPartitioner[(String, String, String)] {
  6. override def partition(record: (String, String, String), key: Array[Byte], value: Array[Byte], targetTopic: String, partitions: Array[Int]): Int = {
  7.     // record: 来源kafka的topic、key、value
  8. Math.abs(new String(record._2).hashCode % partitions.length)
  9. }
  10. }

主类

Main.scala

  1. import format.{MyKafkaDeserializationSchemaTuple3, MyKeySerializationSchema, MyPartitioner, MyTopicSelector, MyValueSerializationSchema}
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy
  3. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  4. import org.apache.flink.api.scala._
  5. import org.apache.flink.connector.base.DeliveryGuarantee
  6. import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
  7. import org.apache.flink.connector.kafka.source.KafkaSource
  8. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
  9. import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
  10. import org.apache.kafka.clients.consumer.OffsetResetStrategy
  11. import java.util.Properties
  12. import scala.collection.JavaConverters._
  13. /**
  14. * @author hushhhh
  15. */
  16. object Main {
  17. def main(args: Array[String]): Unit = {
  18. /**
  19. * env
  20. */
  21. // stream环境
  22. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  23. /**
  24. * source
  25. */
  26. // 定义 KafkaSource
  27. lazy val kafkaSource: KafkaSource[(String, String, String)] = KafkaSource.builder()
  28. // Kafka消费者的各种配置文件,此处省略配置
  29. .setProperties(new Properties())
  30. // 配置消费的一个或多个topic
  31. .setTopics("sourceTopic1,sourceTopic2,...".split(",", -1).toList.asJava)
  32. // 开始消费位置,从已提交的offset开始消费,没有的话从最新的消息开始消费
  33. .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
  34. // 反序列化,使用之前我们自定义的反序列化器
  35. .setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializationSchemaTuple3))
  36. .build()
  37. // 添加 kafka source
  38. val inputDS: DataStream[(String, String, String)] = env.fromSource(
  39. kafkaSource,
  40. WatermarkStrategy.noWatermarks(),
  41. "MyKafkaSource")
  42. .setParallelism(1)
  43. /**
  44. * transformation
  45. */
  46. // 数据加工处理,此处省略
  47. /**
  48. * sink
  49. */
  50.     // 定义 KafkaSink
  51. lazy val kafkaSink: KafkaSink[(String, String, String)] =
  52. KafkaSink.builder[(String, String, String)]()
  53. // 目标集群地址
  54. .setBootstrapServers("bootstrap.servers")
  55. // Kafka生产者的各种配置文件,此处省略配置
  56. .setKafkaProducerConfig(new Properties())
  57. // 定义消息的序列化模式
  58. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
  59. // Topic选择器,使用之前我们自定义的Topic选择器
  60. .setTopicSelector(new MyTopicSelector)
  61. // Key的序列化器,使用之前我们自定义的Key序列化器
  62. .setKeySerializationSchema(new MyKeySerializationSchema)
  63. // Value的序列化器,使用之前我们自定义的Value序列化器
  64. .setValueSerializationSchema(new MyValueSerializationSchema)
  65. // 自定义分区器,使用之前我们自定义的自定义分区器
  66. .setPartitioner(new MyPartitioner)
  67. .build()
  68. )
  69. // 语义保证,保证至少一次
  70. .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  71. .build()
  72. // 添加 kafka sink
  73. inputDS.sinkTo(kafkaSink)
  74. .name("MyKafkaSink")
  75. .setParallelism(1)
  76. /**
  77. * execute
  78. */
  79. env.execute("myJob")
  80. }
  81. }

以上就是KafkaSource和KafkaSink API的简单使用。大佬们感觉有用的话点个赞吧~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/44166
推荐阅读
相关标签