当前位置:   article > 正文

ElasticSearch学习之(十)-Logstash实现从ES同步数据到Mysql数据库_logstash 从es到mysql

logstash 从es到mysql

Logstash概述

       Logstash是一个具有实时管道功能的开源数据收集引擎,Logstash可以动态地将来自不同数据源的数据统一起来,并将数据规范化为所选择的目的地,清理和大众化所有数据,用于各种高级下游分析和可视化用例。虽然Logstash最初推动了日志收集方面的创新,但是它的功能远远超出了这个用例,任何类型的事件都可以通过大量的输入、过滤器和输出插件来丰富和转换,使用许多原生编解码可以进一步简化摄取过程。Logstash通过利用大量和多种数据来提高洞察力。

Logstash工作原理

logstash主要有三个核心环节:inputs → filters → outputs大致工作流程如下:

       

Input:输入数据到logstash

一些常用的输入为:

  • file:从文件系统的文件中读取,类似于tial -f命令;
  • syslog:在514端口上监听系统日志消息,并根据RFC3164标准进行解析;
  • redis:从redis service中读取;
  • beats:从filebeat中读取;

Filters:数据中间处理,对数据进行操作。

一些常用的过滤器为:

  • grok:解析任意文本数据,Grok 是 Logstash 最重要的插件。它的主要作用就是将文本格式的字符串,转换成为具体的结构化的数据,配合正则表达式使用。内置120多个解析语法。官方提供的grok表达式:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
  • grok在线调试:https://grokdebug.herokuapp.com/
  • mutate:对字段进行转换。例如对字段进行删除、替换、修改、重命名等;
  • drop:丢弃一部分events不进行处理;
  • clone:拷贝 event,这个过程中也可以添加或移除字段;
  • geoip:添加地理信息(为前台kibana图形化展示使用);

Outputsoutputslogstash处理管道的最末端组件。一个event可以在处理过程中经过多重输出,但是一旦所有的outputs都执行结束,这个event也就完成生命周期。

一些常见的outputs为:

  • elasticsearch:可以高效的保存数据,并且能够方便和简单的进行查询;
  • file:将event数据保存到文件中;
  • graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展示的组件;

Codecscodecs 是基于数据流的过滤器,它可以作为inputoutput的一部分配置。Codecs可以帮助你轻松的分割发送过来已经被序列化的数据。

一些常见的codecs:

  • json:使用json格式对数据进行编码/解码;
  • multiline:将汇多个事件中数据汇总为一个单一的行。比如:java异常信息和堆栈信息;

DEMO(数据由ES通过logstash同步到MYSQL数据库)

  1. input {
  2. elasticsearch {
  3. hosts => ["127.0.0.1:9998"]
  4. index => "test_customer"
  5. size => 5000
  6. user => "elastic"
  7. password => "testsearch"
  8. }
  9. }
  10. filter{
  11. json{
  12. source => "doc"
  13. }
  14. date {
  15. match => ["[doc][create_time]","yyyy-MM-dd HH:mm:ss", "ISO8601"]
  16. target => "[create_time]"
  17. }
  18. date {
  19. match => ["[doc][update_time]","yyyy-MM-dd HH:mm:ss", "ISO8601"]
  20. target => "[update_time]"
  21. }
  22. }
  23. output {
  24. stdout{
  25. codec=>rubydebug
  26. }
  27. jdbc{
  28. driver_jar_path => "/data/soft/mysql-connector-java-5.1.44.jar"
  29. driver_class => "com.mysql.jdbc.Driver"
  30. connection_string => "jdbc:mysql://127.0.0.1:6379"
  31. username => "test"
  32. password => "test"
  33. statement => [ "insert into CUSTOMER_INFO (ID, CUSTOMER_MOBILE, TRADING_MOBILE, CUSTOMER_TYPE, CUSTOMER_NAME, CARD_TYPE, CARD_NO, FACE_ID, STATUS, CREATE_TIME, UPDATE_TIME, SOURCE, CUSTOMER_SEQ, LMIN_ID, LM_ID) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)","[doc][id]","[doc][customer_mobile]","[doc][trading_mobile]","[doc][customer_name]","[doc][customer_type]","[doc][card_type]","[doc][card_no]","[doc][face_id]","[doc][status]","[doc][create_time]","[doc][update_time]","[doc][source]","[doc][customer_seq]","[doc][lmin_id]","[doc][lm_id]"]
  34. }
  35. }

如有披露或问题欢迎留言或者入群探讨

 

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/312697
推荐阅读
相关标签
  

闽ICP备14008679号