赞
踩
addSINk
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
创建StreamSink对象
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
AbstractUdfStreamOperator下的
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}
执行FlinkKafkaProducerBase
FlinkKafkaProducer010 几种模式
FlinkKafkaPartitioner 为 用户可以自定义分区
public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) public FlinkKafkaProducer010( String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<T> customPartitioner) public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) public FlinkKafkaProducer010( String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<T> customPartitioner)
判断 是否有用户自定分区
if (null != flinkKafkaPartitioner) {
if (flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions(
getPartitionsByTopic(this.defaultTopicId, this.producer));
}
flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
根据topic获取当前topic的分区
protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) { // the fetched list is immutable, so we're creating a mutable copy in order to sort it List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic)); // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks Collections.sort(partitionsList, new Comparator<PartitionInfo>() { @Override public int compare(PartitionInfo o1, PartitionInfo o2) { return Integer.compare(o1.partition(), o2.partition()); } }); int[] partitions = new int[partitionsList.size()]; for (int i = 0; i < partitions.length; i++) { partitions[i] = partitionsList.get(i).partition(); } return partitions; }
分区的存储格式
当没有定义分区时,用默认分区FlinkFixedPartitioner
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
Preconditions.checkArgument(
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
return partitions[parallelInstanceId % partitions.length];//根据取余数分配线程
}
是通过FlinkKafkaProducerBase 调用 flinkKafkaPartitioner 的open 然后FlinkFixedPartitioner 中open override 这个上面为从写的open
flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。