当前位置:   article > 正文

Flink基于时间窗口定时输出到ElasticSearch中并做到真正不丢数据_flink sink每隔多久执行一次

flink sink每隔多久执行一次

Flink时间窗口运用

介绍Flink定时输出到外部存储介质,有两种办法实现,在RichSinkFunction中实现SinkFunction的方法,在其中open()方法中引入java的定时任务

另一种实现,基于Flink window窗口机制,将结果定时sink到ElasticSearch中。

需求:


经过flink清洗后的数据,要求每500毫秒sink一次数据到ES中(该文件内容是String格式,需要进行追加,不属于大家可以用Java实现,具体代码我就不细讲了)。

实现:

1、序列化方法:

  1. /**
  2. * 自定义序列化器
  3. */
  4. public class CustomDeserialization implements DebeziumDeserializationSchema<String> {
  5. @Override
  6. public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
  7. throws Exception {
  8. JSONObject res = new JSONObject();
  9. // 获取数据库和表名称
  10. String topic = sourceRecord.topic();
  11. String[] fields = topic.split("\\.");
  12. String database = fields[1];
  13. String tableName = fields[2];
  14. Struct value = (Struct) sourceRecord.value();
  15. // 获取before数据
  16. Struct before = value.getStruct("before");
  17. JSONObject beforeJson = new JSONObject();
  18. if (before != null) {
  19. Schema beforeSchema = before.schema();
  20. List<Field> beforeFields = beforeSchema.fields();
  21. for (Field field : beforeFields) {
  22. Object beforeValue = before.get(field);
  23. beforeJson.put(field.name(), beforeValue);
  24. }
  25. }
  26. // 获取after数据
  27. Struct after = value.getStruct("after");
  28. JSONObject afterJson = new JSONObject();
  29. if (after != null) {
  30. Schema afterSchema = after.schema();
  31. List<Field> afterFields = afterSchema.fields();
  32. for (Field field : afterFields) {
  33. Object afterValue = after.get(field);
  34. afterJson.put(field.name(), afterValue);
  35. }
  36. }
  37. //获取操作类型 READ DELETE UPDATE CREATE
  38. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  39. String type = operation.toString().toLowerCase();
  40. if ("create".equals(type)) {
  41. type = "insert";
  42. }
  43. // 将字段写到json对象中
  44. res.put("database", database);
  45. res.put("tableName", tableName);
  46. res.put("before", beforeJson);
  47. res.put("after", afterJson);
  48. res.put("type", type);
  49. //输出数据
  50. collector.collect(res.toString());
  51. }
  52. @Override
  53. public TypeInformation<String> getProducedType() {
  54. return BasicTypeInfo.STRING_TYPE_INFO;
  55. }
  56. }

以上时序列号方法,大家可以随意定义因为我这块用了 Flink CDC

2、时间窗口的启用:

  1. /**
  2. * 按时间开窗收集更新全量不会丢数
  3. */
  4. DataStream<List<String>> streamList = streamSource
  5. .windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
  6. .process(new ProcessAllWindowFunction<String, List<String>, TimeWindow>() {
  7. @Override
  8. public void process(Context context, Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
  9. List<String> arrayList = new ArrayList<String>();
  10. iterable.forEach(single -> {
  11. arrayList.add(single);
  12. });
  13. if (arrayList.size() > 0) {
  14. collector.collect(arrayList);
  15. }
  16. }
  17. });

3、Sink下层处理

  1. @Override
  2. public void invoke(List<String> values, Context context) throws Exception {
  3. try {
  4. List<Map<String, Object>> list = new ArrayList<>();
  5. String tName = "";
  6. for (String value : values) {
  7. JSONObject jsonObject = JSON.parseObject(value.toString());
  8. // String arrayslist = Arrays.asList(pgConnection.getTableList()).toString();
  9. String schemaName = jsonObject.get("database").toString();
  10. String tableName = jsonObject.get("tableName").toString();
  11. //多表流需要判断处理,不一样流写入到ES索引也是不一样的
  12. //if (Arrays.asList(pgConnection.getTableList()).contains(schemaName + "." + tableName)) {
  13. tName = schemaName + "_" + tableName;
  14. JSONObject jsonAfter = JSON.parseObject(jsonObject.get("after").toString());
  15. //System.out.println(esLogAppendServer.getFields());
  16. if (jsonObject != null) {
  17. Map<String, Object> map = new HashMap<String, Object>();
  18. for (Map.Entry<String, Object> entry : jsonAfter.entrySet()) {
  19. //这里处理一下日期变成时间戳问题,以下进行遍历执行
  20. map.put(entry.getKey(), entry.getValue());
  21. }
  22. list.add(map);
  23. }
  24. }
  25. saveElasticSearch(tName, list);
  26. } catch (Exception ex) {
  27. log.info(DateUtils.getDate() + "---" + ex.toString());
  28. }
  29. }

测试:

 方便测试,先将时间改为每100毫秒执行,Time.milliseconds(100),通过开窗获取100毫秒的数据:

第1个时间窗口到达:Iterable中集合了这100毫秒接收的所有实时数据,统一处理

总结:

     Flink是实时处理,window机制可以认为是flink的批处理实现,因为需要等待水位线对齐触发timer。一般还基于时间窗口做一些批量处理不会丢数据,所以比较适合数据表全量更新。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/43788
推荐阅读
相关标签
  

闽ICP备14008679号