当前位置:   article > 正文

使用flink将kafka数据同步到delta数据湖中_flink-connector-kafka_2.12

flink-connector-kafka_2.12

使用flink将kafka数据同步到delta中

1.简介

上篇文章简单实现了mysql数据使用flink同步到delta中,现在写一个关于kafka

  • Flink 1.13.0
  • delta 1.0.0
  • flink-connector-kafka_2.12 1.13.0

2.Kafka入湖代码

2.1 Flink运行环境

设置下checkpoint的时间大小

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
  • 1
  • 2

2.2 构建KafkaSouce

构建kafkasource,需要用到topic和consumer,指定key和value的序列化

public static Properties getProperties(String topic, String consumer){
    Properties properties = new Properties();
    properties.put("bootstrap.servers", consumer);
    properties.put("group.id", topic);
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    return properties;
}
//一行代码就可以实现
FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2.3 Kafka消息的Schema转变成Flink-RowType

同mysql一样,使用flink将数据入湖时,需要将kafka的数据格式进行转换成Flink的RowType

通过RowType.RowField实现,这里我的kafka消息由四个字段的数据组成

public static RowType getKafkaRowType(){
    return new RowType(Arrays.asList(
        new RowType.RowField("userId", new VarCharType(VarCharType.MAX_LENGTH)),
        new RowType.RowField("stationTime", new VarCharType(VarCharType.MAX_LENGTH)),
        new RowType.RowField("score", new IntType()),
        new RowType.RowField("localTime", new VarCharType(VarCharType.MAX_LENGTH))));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.4 构建Sink

使用delta-flink依赖中的DeltaSink
.forRowData()方法,指定lakePath,hadoop-conf,rowType,生成Sink

public static org.apache.hadoop.conf.Configuration getHadoopConf() {
    org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
    conf.set("parquet.compression", "SNAPPY");
    return conf;
}

public static DeltaSink<RowData> createDeltaSink(String deltaTablePath, RowType rowType) {
    return DeltaSink
        .forRowData(
        new Path(deltaTablePath),
        getHadoopConf(),
        rowType).build();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.5 String转为RowData

Source端使用String类型,Sink端使用RowData类型,所以需要使用Map函数进行一次转换。

使用fastJson获取每个字段的值,然后变成Flink row类型,最后使用convertor转换为RowData

//存在于flink-table-runtime-blink_2.12依赖中 
public static final DataFormatConverters.DataFormatConverter<RowData, Row> CONVERTER =
    DataFormatConverters.getConverterForDataType(
    TypeConversions.fromLogicalToDataType(getKafkaRowType())
);
public static RowData kafkaJsonToRowData(String line){
    String userId = JSON.parseObject(line).getString("user_id");
    String stationTime = JSON.parseObject(line).getString("station_time");
    Integer score = JSON.parseObject(line).getInteger("score");
    String localTime = JSON.parseObject(line).getString("local_time");
    Row row = Row.of(userId, stationTime, score, localTime);
    return CONVERTER.toInternal(row);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.6 执行

依次将source,sink放入env中执行即可

env.addSource(source)
    .setParallelism(2)
    .map(FlinkDeltaUtil::kafkaJsonToRowData)
    .sinkTo(FlinkDeltaUtil.createDeltaSink(lakePathNoPartition, FlinkDeltaUtil.getKafkaRowType()))
    .setParallelism(1);
env.execute("Flink-Read-Kafka-Json-To-Delta");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2.7 kafka脚本

附上一个向kafka发送消息的脚本,由python实现,指定topic和kafka-server的ip就可以发送

# coding=utf-8
import json
import random
import time
import codecs
from kafka import KafkaProducer


log_file = "/opt/access.log"
topic = 'topic'
kafka_server = 'ip'

user_count = 100
log_count = 300
ip = [127, 156, 222, 105, 24, 192, 153, 127, 31, 168, 32, 10, 82, 77, 118, 228]
status_code = ("200",)
url_count = 10
content_uri_pattern = '/nanHu/contents/{content_id}?user_id={user_id}'


# 随机生成时间
def sample_time():
    return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())


# 随机生成用户
def sample_users():
    # 假设有1000W注册用户,每日访问用户10W-50W人
    all_users = range(1, user_count)
    user_cont = random.randint(10, 50)
    users = random.sample(all_users, user_cont)
    return users


# 随机生成ip
def sample_ip():
    random_ip = random.sample(ip, 4)
    return ".".join([str(item) for item in random_ip])


# 随机生成状态码
def sample_status_code():
    return random.sample(status_code, 1)[0]


# 随机生成停留时间
def sample_station_time():
    return random.randint(15, 60)


# 随机生成分数
def sample_score():
    return random.randint(30, 100)


def generate_log(count=10):
    time_str = sample_time()
    users = sample_users()
    print('Start generate [%s] log..' % log_count)
    producer = KafkaProducer(bootstrap_servers=kafka_server)

    with codecs.open(log_file, "a+", encoding='utf-8') as f:

        while count >= 1:
            # 随机选择一个用户
            user_id = random.choice(users)
            sample_content_id = str(random.randint(0, url_count))

            ret_url = content_uri_pattern.format(
                content_id=sample_content_id,
                user_id=user_id
            )
            query_log = u"{ip} [{local_time}] \"GET {url} HTTP/1.1\" {status_code}".format(
                url=ret_url,
                ip=sample_ip(),
                status_code=sample_status_code(),
                local_time=time_str
            )
            f.write(query_log + u'\n')

            event_log = {
                "station_time": str(sample_station_time()),
                "user_id": user_id,
                "score": sample_score(),
                "local_time": time_str
            }

            producer.send(topic, json.dumps(event_log).encode('utf-8'))
            if count % 100 == 0:
                print('generate msgs: [%s]' % count)
            count = count - 1

    producer.close()
    print('Finish generate log [%s]' % log_count)


if __name__ == '__main__':
    try:
        generate_log(log_count)
    except Exception as e:
        print(str(e))
        exit(-1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102

3. 源码

仓库地址 (https://gitee.com/zhiling-chen/demo-mysql-flink-delta)

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号