当前位置:   article > 正文

springBoot集成Elasticsearch与数据同步方案与问题解决_同步es的依赖

同步es的依赖

一、springBoot集成Es

1、引入jar包依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.elasticsearch.plugin</groupId>
  7. <artifactId>transport-netty4-client</artifactId>
  8. <version>6.2.3</version>
  9. </dependency>

2、配置连接属性

  1. spring:
  2. data:
  3. elasticsearch:
  4. repositories:
  5. enabled: true
  6. cluster-name: elasticsearch
  7. cluster-nodes: 127.0.0.1:9300

3、springboot版本为2.1.9.RELEASE,与es版本对应关系如图:

4、启动项目 

项目启动报错:Error creating bean with name 'elasticsearchClient', AvailableProcessors is already set to [4] 百度上解释是es和 redis产生冲突

解决方式:在启动类中加上

  1. @SpringBootApplication
  2. public class HumanPersonalApplication {
  3. public static void main(String[] args) {
  4. System.setProperty("es.set.netty.runtime.available.processors","false");
  5. SpringApplication.run(HumanPersonalApplication.class, args);
  6. }
  7. }

 在测试类中加上

  1. public EsTest() {
  2. System.setProperty("es.set.netty.runtime.available.processors","false");
  3. }

二、下载对应版本的Elasticsearch、Logstash、kibana进行安装

下载路径:https://www.elastic.co/cn/downloads/

 

三、同步数据两种方式:

1、使用业务代码实现同步

1.1 在业务层进行数据同步,将查询的结果进行封装成对象同步到Es中

  1. @Test
  2. public void same(){
  3. //创建索引
  4. elasticsearchTemplate.createIndex(Resume.class);
  5. List<Resume> list = enterpriseBidStaffMapper.selectAll();
  6. for (Resume r:list) {
  7. List<EducationExperiences> educationExperiences = r.getEducationExperiences();
  8. if(educationExperiences!=null && educationExperiences.size()>0){
  9. for (EducationExperiences e:educationExperiences) {
  10. if(e.getEducation()!=null){
  11. e.setEducationo(Transformation.getEducation(e.getEducation()));
  12. }
  13. }
  14. }
  15. }
  16. //批量同步数据
  17. esRespostory.saveAll(list);
  18. System.out.println(list.size());
  19. }

1.2 关于LocalDateTime反 序列化问题处理方案:

  1. /**
  2. * 创建时间
  3. */
  4. @DateTimeFormat(pattern = "yyyy-MM-dd\'T\'HH:mm:ss.SSS")
  5. @JsonSerialize(using = LocalDateTimeSerializer.class)
  6. @JsonDeserialize(using = LocalDateTimeDeserializer.class)
  7. private LocalDateTime createTime;

1.3 对于分词字段与不分词字段设置

  1. /**
  2. * FieldType.Text 定义该字段需要分词
  3. */
  4. @Field(type = FieldType.Text,analyzer = "ik_max_word")
  5. private String hopeOccupationName;
  6. /**
  7. * FieldType.Keyword定义字段不需要分词
  8. */
  9. @Field(type = FieldType.Keyword)
  10. private String jobNature;

1.4 对于es分页查询

  1. Integer pageNo = Integer.valueOf(maps.get("page").toString());
  2. Integer pageSize = Integer.valueOf(maps.get("limit").toString());
  3. //pageNo-1当前页
  4. //每页记录数
  5. //Sort.by(Sort.Direction.DESC,"排序字段")
  6. PageRequest pageRequest = PageRequest.of(pageNo-1, pageSize, Sort.by(Sort.Direction.DESC,"id"));
  7. org.springframework.data.domain.Page<Resume> page = esRespostory.search(boolQueryBuilder,pageRequest);
  8. List<Resume> resumes = page.getContent();
  9. Map<String, Object> result = new HashMap<>();
  10. result.put("records" ,resumes);
  11. result.put("total", page.getTotalElements());
  12. result.put("size",pageSize);
  13. result.put("current",pageNo);
  14. int totalElements = (int)page.getTotalElements();
  15. result.put("pages",totalElements%pageSize==0?totalElements/pageSize:totalElements/pageSize+1);

2 、使用Logstash进行数据增量同步

1、修改配置文件

  1. # Sample Logstash configuration for creating a simple
  2. # Beats -> Logstash -> Elasticsearch pipeline.
  3. input {
  4. stdin {}
  5. jdbc {
  6. jdbc_driver_library => "E:\\Elasticsearch\\logstash-6.4.3\\mysql-connector-java-5.1.47.jar"
  7. jdbc_driver_class => "com.mysql.jdbc.Driver"
  8. jdbc_connection_string => "jdbc:mysql://ip:port/sipaote_human_test?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"
  9. jdbc_user => "name"
  10. jdbc_password => "pwd"
  11. jdbc_paging_enabled => true
  12. #是否记录上次执行结果,如果为真,将会把上次执行到的tracking_column字段的值记录下来,保存到last_run_metadata_path指定的文件中
  13. record_last_run => true
  14. #是否需要记录某个column的值
  15. use_column_value => true
  16. #如果use_column_value的值为true,需配置此参数,track的数据库column名,该column必须是递增的
  17. tracking_column => "updateTime"
  18. tracking_column_type => "timestamp"
  19. last_run_metadata_path => "E:\\Elasticsearch\\logstash-6.4.3\\.logstash_jdbc_last_run"
  20. lowercase_column_names => false
  21. #处理中文乱码问题
  22. codec => plain { charset => "UTF-8"}
  23. # 索引类型
  24. #type => "bid"
  25. clean_run => false
  26. schedule => "*/5 * * * * *"
  27. statement_filepath => "E:\\Elasticsearch\\logstash-6.4.3\\my.sql"
  28. #statement => ""
  29. }
  30. }
  31. filter {
  32. aggregate {
  33. task_id => "%{bsid}"
  34. code => "
  35. map['bsid'] = event.get('bsid')
  36. map['hopeCityName'] = event.get('hopeCityName')
  37. map['hopeIndustryName'] = event.get('hopeIndustryName')
  38. map['hopeOccupationName'] = event.get('hopeOccupationName')
  39. map['jobNature'] = event.get('jobNature')
  40. map['name'] = event.get('name')
  41. map['tel'] = event.get('tel')
  42. map['createTime'] = event.get('createTime')
  43. map['sex'] = event.get('sex')
  44. map['createUser'] = event.get('createUser')
  45. map['enterpriseId'] = event.get('enterpriseId')
  46. map['headhuntingName'] = event.get('headhuntingName')
  47. map['isBlack'] = event.get('isBlack')
  48. #使用map['w_list']去除重复
  49. map['w_list'] ||=[]
  50. map['workExperience'] ||=[]
  51. if(event.get('wid')!=nil)
  52. if !(map['w_list'].include? event.get('wid'))
  53. map['w_list'] << event.get('wid')
  54. map['workExperience'] << {
  55. 'wid' => event.get('wid'),
  56. 'industryName' => event.get('industryName'),
  57. 'skillsDescribe' => event.get('skillsDescribe')
  58. }
  59. end
  60. end
  61. map['e_list'] ||=[]
  62. map['educationExperiences'] ||=[]
  63. if(event.get('eid')!=nil)
  64. if !(map['e_list'].include? event.get('eid'))
  65. map['e_list'] << event.get('eid')
  66. map['educationExperiences'] << {
  67. 'eid' => event.get('eid'),
  68. 'schoolName' => event.get('schoolName'),
  69. 'speciality' => event.get('speciality')
  70. }
  71. end
  72. end
  73. map['s_list'] ||=[]
  74. map['skill'] ||=[]
  75. if(event.get('sid')!=nil)
  76. if !(map['s_list'].include? event.get('sid'))
  77. map['s_list'] << event.get('sid')
  78. map['skill'] << {
  79. 'sid' => event.get('sid'),
  80. 'skillName' => event.get('skillName')
  81. }
  82. end
  83. end
  84. map['c_list'] ||=[]
  85. map['certificate'] ||=[]
  86. if(event.get('cid')!=nil)
  87. if !(map['c_list'].include? event.get('cid'))
  88. map['c_list'] << event.get('cid')
  89. map['certificate'] << {
  90. 'cid' => event.get('cid'),
  91. 'certificateName' => event.get('certificateName')
  92. }
  93. end
  94. end
  95. event.cancel()
  96. "
  97. push_previous_map_as_event => true
  98. timeout => 5
  99. }
  100. json {
  101. source => "message"
  102. remove_field => ["message"]
  103. #remove_field => ["message", "type", "@timestamp", "@version"]
  104. }
  105. mutate {
  106. #将不需要的JSON字段过滤,且不会被存入 ES 中
  107. remove_field => ["tags", "@timestamp", "@version"]
  108. }
  109. }
  110. output {
  111. #if[type]=="bid"{
  112. elasticsearch {
  113. hosts => ["127.0.0.1:9200"]
  114. index => "human"
  115. document_type => "resumans"
  116. document_id => "%{bsid}"
  117. }
  118. stdout{
  119. codec => json
  120. }
  121. #}
  122. }

2.2  多表关联查询语句

  1. SELECT
  2. bs.id bsid,
  3. bs.hope_city_name hopeCityName,
  4. bs.hope_industry_name hopeIndustryName,
  5. bs.hope_occupation_name hopeOccupationName,
  6. bs.job_nature jobNature,
  7. bs.job_status jobStatus,
  8. bs.`name`,
  9. bs.tel,
  10. bs.create_time createTime,
  11. bs.sex,
  12. bs.create_user createUser,
  13. IF(
  14. IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(w.update_time,'1997-1-1 00:00:00'),
  15. IF(
  16. IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(e.update_time,'1997-1-1 00:00:00'),
  17. IF(
  18. IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
  19. IF(
  20. IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  21. IF(IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),bs.update_time,k.update_time),
  22. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  23. ),
  24. IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  25. IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
  26. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  27. )
  28. ),
  29. IF(
  30. IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
  31. IF(
  32. IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  33. IF(IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),e.update_time,k.update_time),
  34. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  35. ),
  36. IF(
  37. IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  38. IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
  39. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  40. )
  41. )
  42. ),
  43. IF(
  44. IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(e.update_time,'1997-1-1 00:00:00'),
  45. IF(
  46. IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
  47. IF(
  48. IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  49. IF(IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),w.update_time,k.update_time),
  50. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  51. ),
  52. IF(
  53. IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  54. IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
  55. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  56. )
  57. ),
  58. IF(
  59. IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
  60. IF(
  61. IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  62. IF(IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),e.update_time,k.update_time),
  63. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  64. ),
  65. IF(
  66. IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  67. IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
  68. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  69. )
  70. )
  71. )
  72. ) updateTime,
  73. w.id wid,
  74. w.industry_name industryName,
  75. w.skills_describe skillsDescribe,
  76. e.id eid,
  77. e.school_name schoolName,
  78. e.speciality speciality,
  79. e.education education,
  80. c.id cid,
  81. c.certificate_name certificateName,
  82. s.id sid,
  83. s.skill_name skillName,
  84. u.`name` headhuntingName,
  85. IF(k.id is NULL,0,1) isBlack
  86. FROM s_enterprise_bid_staff bs
  87. LEFT JOIN s_work_experience w ON bs.id=w.bid_staff_id
  88. LEFT JOIN s_education_experience e ON bs.id=e.bid_staff_id
  89. LEFT JOIN s_certificate c ON bs.id=c.bid_staff_id
  90. LEFT JOIN s_skill s ON bs.id=s.bid_staff_id
  91. LEFT JOIN s_enterprise_user u ON bs.bid_enterprise_id=u.id
  92. LEFT JOIN s_blacklist k ON k.bid_staff_id=bs.id AND is_delete=0
  93. where
  94. bs.id in (
  95. SELECT
  96. bs.id bsid
  97. FROM s_enterprise_bid_staff bs
  98. LEFT JOIN s_work_experience w ON bs.id=w.bid_staff_id
  99. LEFT JOIN s_education_experience e ON bs.id=e.bid_staff_id
  100. LEFT JOIN s_certificate c ON bs.id=c.bid_staff_id
  101. LEFT JOIN s_skill s ON bs.id=s.bid_staff_id
  102. LEFT JOIN s_enterprise_user u ON bs.bid_enterprise_id=u.id
  103. LEFT JOIN s_blacklist k ON k.bid_staff_id=bs.id
  104. where
  105. IF(
  106. IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(w.update_time,'1997-1-1 00:00:00'),
  107. IF(
  108. IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(e.update_time,'1997-1-1 00:00:00'),
  109. IF(
  110. IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
  111. IF(
  112. IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  113. IF(IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),bs.update_time,k.update_time),
  114. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  115. ),
  116. IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  117. IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
  118. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  119. )
  120. ),
  121. IF(
  122. IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
  123. IF(
  124. IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  125. IF(IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),e.update_time,k.update_time),
  126. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  127. ),
  128. IF(
  129. IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  130. IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
  131. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  132. )
  133. )
  134. ),
  135. IF(
  136. IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(e.update_time,'1997-1-1 00:00:00'),
  137. IF(
  138. IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
  139. IF(
  140. IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  141. IF(IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),w.update_time,k.update_time),
  142. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  143. ),
  144. IF(
  145. IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  146. IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
  147. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  148. )
  149. ),
  150. IF(
  151. IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
  152. IF(
  153. IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  154. IF(IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),e.update_time,k.update_time),
  155. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  156. ),
  157. IF(
  158. IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
  159. IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
  160. IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
  161. )
  162. )
  163. )
  164. )> :sql_last_value
  165. )
  166. ORDER BY bs.id DESC

2.3 遇到的问题与解决方案:

2.3.1   logstash同步nested嵌套数据类型 :创建对应的mapping 映射关系

  1. PUT /human
  2. {
  3. "mappings": {
  4. "resumans": {
  5. "properties": {
  6. "bsid":{
  7. "type": "integer"
  8. },
  9. "hopeCityName":{
  10. "type": "text",
  11. "analyzer": "ik_max_word",
  12. "search_analyzer": "ik_max_word"
  13. },
  14. "hopeIndustryName":{
  15. "type": "text",
  16. "analyzer": "ik_max_word",
  17. "search_analyzer": "ik_max_word"
  18. },
  19. "hopeOccupationName":{
  20. "type": "text"
  21. },
  22. "jobNature":{
  23. "type": "text"
  24. },
  25. "name":{
  26. "type": "keyword"
  27. },
  28. "tel":{
  29. "type": "keyword"
  30. },
  31. "createTime":{
  32. "type": "date"
  33. },
  34. "sex":{
  35. "type": "integer"
  36. },
  37. "createUser":{
  38. "type": "integer"
  39. },
  40. "enterpriseId":{
  41. "type": "integer"
  42. },
  43. "headhuntingName":{
  44. "type": "keyword"
  45. },
  46. "isBlack":{
  47. "type": "integer"
  48. },
  49. "workExperience":{
  50. "type": "nested",
  51. "properties": {
  52. "wid":{
  53. "type":"integer"
  54. },
  55. "industryName":{
  56. "type":"text"
  57. },
  58. "skillsDescribe":{
  59. "type":"text"
  60. }
  61. }
  62. },
  63. "educationExperiences":{
  64. "type": "nested",
  65. "properties":{
  66. "eid":{
  67. "type":"integer"
  68. },
  69. "schoolName":{
  70. "type":"text",
  71. "analyzer": "ik_max_word",
  72. "search_analyzer": "ik_max_word"
  73. },
  74. "speciality":{
  75. "type":"text",
  76. "analyzer": "ik_max_word",
  77. "search_analyzer": "ik_max_word"
  78. }
  79. }
  80. },
  81. "skill":{
  82. "type": "nested",
  83. "properties":{
  84. "sid":{
  85. "type":"integer"
  86. },
  87. "skillName":{
  88. "type":"text",
  89. "analyzer": "ik_max_word",
  90. "search_analyzer": "ik_max_word"
  91. }
  92. }
  93. },
  94. "certificate":{
  95. "type": "nested",
  96. "properties":{
  97. "cid":{
  98. "type":"integer"
  99. },
  100. "certificateName":{
  101. "type":"text",
  102. "analyzer": "ik_max_word",
  103. "search_analyzer": "ik_max_word"
  104. }
  105. }
  106. }
  107. }
  108. }
  109. }
  110. }

2.3.2   logstash同步数据时会出现子对象数组中的对象数据出现重复:

  解决方法:

  1. #使用map['w_list']去除重复
  2. map['w_list'] ||=[]
  3. map['workExperience'] ||=[]
  4. if(event.get('wid')!=nil)
  5. if !(map['w_list'].include? event.get('wid'))
  6. map['w_list'] << event.get('wid')
  7. map['workExperience'] << {
  8. 'wid' => event.get('wid'),
  9. 'industryName' => event.get('industryName'),
  10. 'skillsDescribe' => event.get('skillsDescribe')
  11. }
  12. end
  13. end

2.3.3 logstash同步时少同步一条数据,在停止logstash服务时才进行同步

在filter 聚合配置中添加:timeout => 5

filter aggregate 创建中 event map 并不知道这次事件是不是应该结束,也就是它也不知道到那一条才是最后一条, 因此设置一个 timeout 告诉它这个时间执行多少秒就结束继续执行第二个。但这样并不是很严谨,因为你也不确定你的 event map 到底要执行多久 。最好的方式是 我们应该给定一个 task end 的条件

2.3.4 logstash第一次全量同步数据时会出现子对象数组中的对象丢失

解决方法参考:https://blog.csdn.net/menglinjie/article/details/102984845

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号