赞
踩
一、springBoot集成Es
1、引入jar包依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.elasticsearch.plugin</groupId>
- <artifactId>transport-netty4-client</artifactId>
- <version>6.2.3</version>
- </dependency>
-
2、配置连接属性
- spring:
- data:
- elasticsearch:
- repositories:
- enabled: true
- cluster-name: elasticsearch
- cluster-nodes: 127.0.0.1:9300
3、springboot版本为2.1.9.RELEASE,与es版本对应关系如图:
4、启动项目
- @SpringBootApplication
- public class HumanPersonalApplication {
-
- public static void main(String[] args) {
- System.setProperty("es.set.netty.runtime.available.processors","false");
- SpringApplication.run(HumanPersonalApplication.class, args);
- }
-
- }
- public EsTest() {
- System.setProperty("es.set.netty.runtime.available.processors","false");
- }
二、下载对应版本的Elasticsearch、Logstash、kibana进行安装
下载路径:https://www.elastic.co/cn/downloads/
1.1 在业务层进行数据同步,将查询的结果进行封装成对象同步到Es中
- @Test
- public void same(){
- //创建索引
- elasticsearchTemplate.createIndex(Resume.class);
- List<Resume> list = enterpriseBidStaffMapper.selectAll();
- for (Resume r:list) {
- List<EducationExperiences> educationExperiences = r.getEducationExperiences();
- if(educationExperiences!=null && educationExperiences.size()>0){
- for (EducationExperiences e:educationExperiences) {
- if(e.getEducation()!=null){
- e.setEducationo(Transformation.getEducation(e.getEducation()));
- }
- }
- }
- }
- //批量同步数据
- esRespostory.saveAll(list);
- System.out.println(list.size());
- }

1.2 关于LocalDateTime反 序列化问题处理方案:
- /**
- * 创建时间
- */
- @DateTimeFormat(pattern = "yyyy-MM-dd\'T\'HH:mm:ss.SSS")
- @JsonSerialize(using = LocalDateTimeSerializer.class)
- @JsonDeserialize(using = LocalDateTimeDeserializer.class)
- private LocalDateTime createTime;
1.3 对于分词字段与不分词字段设置
- /**
- * FieldType.Text 定义该字段需要分词
- */
- @Field(type = FieldType.Text,analyzer = "ik_max_word")
- private String hopeOccupationName;
-
- /**
- * FieldType.Keyword定义字段不需要分词
- */
- @Field(type = FieldType.Keyword)
- private String jobNature;
1.4 对于es分页查询
- Integer pageNo = Integer.valueOf(maps.get("page").toString());
- Integer pageSize = Integer.valueOf(maps.get("limit").toString());
- //pageNo-1当前页
- //每页记录数
- //Sort.by(Sort.Direction.DESC,"排序字段")
- PageRequest pageRequest = PageRequest.of(pageNo-1, pageSize, Sort.by(Sort.Direction.DESC,"id"));
- org.springframework.data.domain.Page<Resume> page = esRespostory.search(boolQueryBuilder,pageRequest);
- List<Resume> resumes = page.getContent();
- Map<String, Object> result = new HashMap<>();
- result.put("records" ,resumes);
- result.put("total", page.getTotalElements());
- result.put("size",pageSize);
- result.put("current",pageNo);
- int totalElements = (int)page.getTotalElements();
- result.put("pages",totalElements%pageSize==0?totalElements/pageSize:totalElements/pageSize+1);
1、修改配置文件
- # Sample Logstash configuration for creating a simple
- # Beats -> Logstash -> Elasticsearch pipeline.
-
- input {
-
- stdin {}
-
- jdbc {
- jdbc_driver_library => "E:\\Elasticsearch\\logstash-6.4.3\\mysql-connector-java-5.1.47.jar"
- jdbc_driver_class => "com.mysql.jdbc.Driver"
- jdbc_connection_string => "jdbc:mysql://ip:port/sipaote_human_test?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"
- jdbc_user => "name"
- jdbc_password => "pwd"
- jdbc_paging_enabled => true
- #是否记录上次执行结果,如果为真,将会把上次执行到的tracking_column字段的值记录下来,保存到last_run_metadata_path指定的文件中
- record_last_run => true
- #是否需要记录某个column的值
- use_column_value => true
- #如果use_column_value的值为true,需配置此参数,track的数据库column名,该column必须是递增的
- tracking_column => "updateTime"
- tracking_column_type => "timestamp"
- last_run_metadata_path => "E:\\Elasticsearch\\logstash-6.4.3\\.logstash_jdbc_last_run"
- lowercase_column_names => false
- #处理中文乱码问题
- codec => plain { charset => "UTF-8"}
- # 索引类型
- #type => "bid"
- clean_run => false
- schedule => "*/5 * * * * *"
- statement_filepath => "E:\\Elasticsearch\\logstash-6.4.3\\my.sql"
- #statement => ""
- }
- }
-
- filter {
- aggregate {
- task_id => "%{bsid}"
- code => "
- map['bsid'] = event.get('bsid')
- map['hopeCityName'] = event.get('hopeCityName')
- map['hopeIndustryName'] = event.get('hopeIndustryName')
- map['hopeOccupationName'] = event.get('hopeOccupationName')
- map['jobNature'] = event.get('jobNature')
- map['name'] = event.get('name')
- map['tel'] = event.get('tel')
- map['createTime'] = event.get('createTime')
- map['sex'] = event.get('sex')
- map['createUser'] = event.get('createUser')
- map['enterpriseId'] = event.get('enterpriseId')
- map['headhuntingName'] = event.get('headhuntingName')
- map['isBlack'] = event.get('isBlack')
- #使用map['w_list']去除重复
- map['w_list'] ||=[]
- map['workExperience'] ||=[]
- if(event.get('wid')!=nil)
- if !(map['w_list'].include? event.get('wid'))
- map['w_list'] << event.get('wid')
- map['workExperience'] << {
- 'wid' => event.get('wid'),
- 'industryName' => event.get('industryName'),
- 'skillsDescribe' => event.get('skillsDescribe')
- }
- end
- end
- map['e_list'] ||=[]
- map['educationExperiences'] ||=[]
- if(event.get('eid')!=nil)
- if !(map['e_list'].include? event.get('eid'))
- map['e_list'] << event.get('eid')
- map['educationExperiences'] << {
- 'eid' => event.get('eid'),
- 'schoolName' => event.get('schoolName'),
- 'speciality' => event.get('speciality')
- }
- end
- end
- map['s_list'] ||=[]
- map['skill'] ||=[]
- if(event.get('sid')!=nil)
- if !(map['s_list'].include? event.get('sid'))
- map['s_list'] << event.get('sid')
- map['skill'] << {
- 'sid' => event.get('sid'),
- 'skillName' => event.get('skillName')
- }
- end
- end
- map['c_list'] ||=[]
- map['certificate'] ||=[]
- if(event.get('cid')!=nil)
- if !(map['c_list'].include? event.get('cid'))
- map['c_list'] << event.get('cid')
- map['certificate'] << {
- 'cid' => event.get('cid'),
- 'certificateName' => event.get('certificateName')
- }
- end
- end
- event.cancel()
- "
-
- push_previous_map_as_event => true
- timeout => 5
- }
- json {
- source => "message"
- remove_field => ["message"]
- #remove_field => ["message", "type", "@timestamp", "@version"]
- }
- mutate {
- #将不需要的JSON字段过滤,且不会被存入 ES 中
- remove_field => ["tags", "@timestamp", "@version"]
- }
- }
-
-
-
- output {
-
- #if[type]=="bid"{
- elasticsearch {
- hosts => ["127.0.0.1:9200"]
- index => "human"
- document_type => "resumans"
- document_id => "%{bsid}"
- }
-
- stdout{
- codec => json
- }
- #}
-
- }

2.2 多表关联查询语句
- SELECT
- bs.id bsid,
- bs.hope_city_name hopeCityName,
- bs.hope_industry_name hopeIndustryName,
- bs.hope_occupation_name hopeOccupationName,
- bs.job_nature jobNature,
- bs.job_status jobStatus,
- bs.`name`,
- bs.tel,
- bs.create_time createTime,
- bs.sex,
- bs.create_user createUser,
- IF(
- IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(w.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(e.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),bs.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- ),
- IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- )
- ),
- IF(
- IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),e.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- ),
- IF(
- IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- )
- )
- ),
- IF(
- IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(e.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),w.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- ),
- IF(
- IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- )
- ),
- IF(
- IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),e.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- ),
- IF(
- IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- )
- )
- )
- ) updateTime,
- w.id wid,
- w.industry_name industryName,
- w.skills_describe skillsDescribe,
- e.id eid,
- e.school_name schoolName,
- e.speciality speciality,
- e.education education,
- c.id cid,
- c.certificate_name certificateName,
- s.id sid,
- s.skill_name skillName,
- u.`name` headhuntingName,
- IF(k.id is NULL,0,1) isBlack
- FROM s_enterprise_bid_staff bs
- LEFT JOIN s_work_experience w ON bs.id=w.bid_staff_id
- LEFT JOIN s_education_experience e ON bs.id=e.bid_staff_id
- LEFT JOIN s_certificate c ON bs.id=c.bid_staff_id
- LEFT JOIN s_skill s ON bs.id=s.bid_staff_id
- LEFT JOIN s_enterprise_user u ON bs.bid_enterprise_id=u.id
- LEFT JOIN s_blacklist k ON k.bid_staff_id=bs.id AND is_delete=0
- where
- bs.id in (
- SELECT
- bs.id bsid
- FROM s_enterprise_bid_staff bs
- LEFT JOIN s_work_experience w ON bs.id=w.bid_staff_id
- LEFT JOIN s_education_experience e ON bs.id=e.bid_staff_id
- LEFT JOIN s_certificate c ON bs.id=c.bid_staff_id
- LEFT JOIN s_skill s ON bs.id=s.bid_staff_id
- LEFT JOIN s_enterprise_user u ON bs.bid_enterprise_id=u.id
- LEFT JOIN s_blacklist k ON k.bid_staff_id=bs.id
- where
- IF(
- IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(w.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(e.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(bs.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),bs.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- ),
- IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- )
- ),
- IF(
- IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),e.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- ),
- IF(
- IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- )
- )
- ),
- IF(
- IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(e.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(w.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),w.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- ),
- IF(
- IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- )
- ),
- IF(
- IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(c.update_time,'1997-1-1 00:00:00'),
- IF(
- IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(e.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),e.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- ),
- IF(
- IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(s.update_time,'1997-1-1 00:00:00'),
- IF(IFNULL(c.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),c.update_time,k.update_time),
- IF(IFNULL(s.update_time,'1997-1-1 00:00:00')>IFNULL(k.update_time,'1997-1-1 00:00:00'),s.update_time,k.update_time)
- )
- )
- )
- )> :sql_last_value
- )
- ORDER BY bs.id DESC

2.3 遇到的问题与解决方案:
2.3.1 logstash同步nested嵌套数据类型 :创建对应的mapping 映射关系
- PUT /human
- {
- "mappings": {
- "resumans": {
- "properties": {
- "bsid":{
- "type": "integer"
- },
- "hopeCityName":{
- "type": "text",
- "analyzer": "ik_max_word",
- "search_analyzer": "ik_max_word"
- },
- "hopeIndustryName":{
- "type": "text",
- "analyzer": "ik_max_word",
- "search_analyzer": "ik_max_word"
-
- },
- "hopeOccupationName":{
- "type": "text"
- },
- "jobNature":{
- "type": "text"
- },
- "name":{
- "type": "keyword"
- },
- "tel":{
- "type": "keyword"
- },
- "createTime":{
- "type": "date"
- },
- "sex":{
- "type": "integer"
- },
- "createUser":{
- "type": "integer"
- },
- "enterpriseId":{
- "type": "integer"
- },
- "headhuntingName":{
- "type": "keyword"
- },
- "isBlack":{
- "type": "integer"
- },
- "workExperience":{
- "type": "nested",
- "properties": {
- "wid":{
- "type":"integer"
- },
- "industryName":{
- "type":"text"
- },
- "skillsDescribe":{
- "type":"text"
- }
- }
- },
- "educationExperiences":{
- "type": "nested",
- "properties":{
- "eid":{
- "type":"integer"
- },
- "schoolName":{
- "type":"text",
- "analyzer": "ik_max_word",
- "search_analyzer": "ik_max_word"
- },
- "speciality":{
- "type":"text",
- "analyzer": "ik_max_word",
- "search_analyzer": "ik_max_word"
- }
- }
- },
- "skill":{
- "type": "nested",
- "properties":{
- "sid":{
- "type":"integer"
- },
- "skillName":{
- "type":"text",
- "analyzer": "ik_max_word",
- "search_analyzer": "ik_max_word"
- }
- }
- },
- "certificate":{
- "type": "nested",
- "properties":{
- "cid":{
- "type":"integer"
- },
- "certificateName":{
- "type":"text",
- "analyzer": "ik_max_word",
- "search_analyzer": "ik_max_word"
- }
- }
- }
- }
- }
- }
- }

2.3.2 logstash同步数据时会出现子对象数组中的对象数据出现重复:
解决方法:
- #使用map['w_list']去除重复
- map['w_list'] ||=[]
- map['workExperience'] ||=[]
- if(event.get('wid')!=nil)
- if !(map['w_list'].include? event.get('wid'))
- map['w_list'] << event.get('wid')
- map['workExperience'] << {
- 'wid' => event.get('wid'),
- 'industryName' => event.get('industryName'),
- 'skillsDescribe' => event.get('skillsDescribe')
- }
- end
- end
2.3.3 logstash同步时少同步一条数据,在停止logstash服务时才进行同步
在filter 聚合配置中添加:timeout => 5
filter aggregate 创建中 event map 并不知道这次事件是不是应该结束,也就是它也不知道到那一条才是最后一条, 因此设置一个 timeout 告诉它这个时间执行多少秒就结束继续执行第二个。但这样并不是很严谨,因为你也不确定你的 event map 到底要执行多久 。最好的方式是 我们应该给定一个 task end 的条件
2.3.4 logstash第一次全量同步数据时会出现子对象数组中的对象丢失
解决方法参考:https://blog.csdn.net/menglinjie/article/details/102984845
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。