赞
踩
记得好几年前用es做过标签画像统计,如今再看es时已是很生疏了,再用时已更新到了7.12版本了。以前用TransportClient客户端,现在出了而且是官方推荐用RestHighLevelClient客户端。
这几天用RestHighLevelClient时还是觉得比较方便的。现将一些基本常用功能记录一下。
1.初始化和关闭 public static RestHighLevelClient getClient(String host, int port) { LOGGER.info("Init ES!"); client \= new RestHighLevelClient( RestClient.builder(new HttpHost(host, port, "http"))); return client; } public static void closeES() { LOGGER.info("ES closed!"); if(null != client) { try { client.close(); } catch (IOException e) { e.printStackTrace(); } } } 2.创建index以及mapping public boolean createIndexMapping(RestHighLevelClient client, String indexName){ LOGGER.info("create index and mapping ..."); CreateIndexRequest createIndexRequest \= new CreateIndexRequest(indexName); createIndexRequest.settings(Settings.builder() .put("index.number\_of\_shards",1) .put("index.number\_of\_replicas",0)); try { XContentBuilder xContentBuilder \= XContentFactory.jsonBuilder().startObject().startObject("properties") .startObject("content") .field("type", "text") // 数据类型 .field("index", "true") //默认 .field("analyzer", "ik\_max\_word") .field("search\_analyzer", "ik\_smart") .endObject() .startObject("date") .field("type", "date") // 数据类型 .field("index", "true") //默认 .endObject() .startObject("title") .field("type", "text") // 数据类型 .field("index", "true") //默认 .field("analyzer", "ik\_max\_word") .field("search\_analyzer", "ik\_smart") .endObject() .endObject() .endObject(); createIndexRequest.mapping(xContentBuilder); CreateIndexResponse createIndexResponse \= client.indices().create(createIndexRequest, RequestOptions.DEFAULT); return createIndexResponse.isAcknowledged(); } catch (IOException e) { e.printStackTrace(); } return false; } 3.获取一篇doc public Map<String, Object> getOneMap(RestHighLevelClient client, String index, String id) { GetRequest getRequest \= new GetRequest(index, id); GetResponse getResponse \= null; try { getResponse \= client.get(getRequest, RequestOptions.DEFAULT); if(getResponse.isExists()) { return getResponse.getSource(); } } catch (IOException e) { e.printStackTrace(); } return null; } 4.删除多篇 /\*\* \* 批量删除 \* @param client \* @param index \* @param ids \* @return \*/ public Object deleteList(RestHighLevelClient client, String index, List<String> ids) { //构建批量删除请求 DeleteByQueryRequest request = new DeleteByQueryRequest(index); IdsQueryBuilder queryBuilder \= new IdsQueryBuilder(); for(String id: ids) { queryBuilder.addIds(id); } // 匹配所有 request.setQuery(queryBuilder); BulkByScrollResponse response \= null; try { response \= client.deleteByQuery(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } return JSONObject.toJSON(response); } 5.批量导入 /\*\* \* 批量导入 \* @param client \* @param index \* @param list \* @return \*/ public boolean insertDocByBulk(RestHighLevelClient client, String index, List<Doc> list) { //批量插入请求 BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout("10s"); for(int i = 0; i < list.size(); i++) { Doc doc \= list.get(i); //这里必须每次都使用new IndexRequest(index,type),不然只会插入最后一条记录(这样插入不会覆盖已经存在的Id,也就是不能更新) Map<String, Object> kv = new HashMap<>(); kv.put("id", doc.getId()); kv.put("title", doc.getTitle()); kv.put("content", doc.getContent()); bulkRequest.add(new IndexRequest(index).id(String.valueOf(doc.getId())).source(kv)); //或者 //bulkRequest.add(new IndexRequest(index).id(item.getID()).source(JSON.toJSONString(doc), XContentType.JSON)); } try { // 客户端返回 BulkResponse responses = client.bulk(bulkRequest, RequestOptions.DEFAULT); // responses.hasFailures(); // 是否失败,false表示成功! if(RestStatus.CREATED == responses.status()) { return true; } } catch (IOException e) { e.printStackTrace(); } return false; } 6.批量更新 /\*\* \* 批量update \* @param client \* @param index \* @param list \* @return \*/ public boolean updateDocByBulk(RestHighLevelClient client, String index, List<Doc> list) { //批量插入请求 BulkRequest bulkRequest = new BulkRequest(); bulkRequest.timeout("10s"); for(int i = 0; i < list.size(); i++) { Doc doc \= list.get(i); //这里必须每次都使用new IndexRequest(index,type),不然只会插入最后一条记录(这样插入不会覆盖已经存在的Id,也就是不能更新) Map<String, Object> kv = new HashMap<>(); kv.put("id", doc.getId()); kv.put("title", doc.getTitle()); kv.put("content", doc.getContent()); bulkRequest.add(new UpdateRequest().index(index).id(String.valueOf(doc.getId())).doc(kv)); } try { // 客户端返回 BulkResponse responses = client.bulk(bulkRequest, RequestOptions.DEFAULT); // responses.hasFailures(); // 是否失败,false表示成功! if(RestStatus.OK == responses.status()) { return true; } } catch (IOException e) { e.printStackTrace(); } return false; } 7.全量搜索 7.1 利用scroll /\*\* \* 全量搜索 \* @param client \* @param index \* @param field1 \* @param field2 \* @param query \* @param size \* @param include \* @param exclude \* @return \*/ public List<org.elasticsearch.search.SearchHit> searchByQueryScrollAll(RestHighLevelClient client, String index, String field1, String field2, String query, int size, String\[\] include, String\[\] exclude) { List<org.elasticsearch.search.SearchHit> result = CollectionUtil.newArrayList(); final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); SearchRequest searchRequest \= new SearchRequest(index); searchRequest.scroll(scroll); SearchSourceBuilder searchSourceBuilder \= new SearchSourceBuilder(); // 高亮显示 HighlightBuilder highlightBuilder = new HighlightBuilder(); // 高亮标签 highlightBuilder.preTags("<a style='color: #e4393c'>"); highlightBuilder.postTags("</a>"); // 高亮字段 highlightBuilder.field(field2); //设置最多一次能够取出(size)笔数据,从第(size + 1)笔数据开始,将开启滚动查询。 (滚动查询也属于这一次查询,只不过因为一次查不完,分多次查) searchSourceBuilder.size(size); //searchSourceBuilder.sort("\_score", SortOrder.DESC); //socre相同,则按时间降序排序 //searchSourceBuilder.sort("publish\_date", SortOrder.DESC); //高亮显示添加到构造器(不需要高亮显示则不添加) searchSourceBuilder.highlighter(highlightBuilder); // 多字段联合查询 //searchSourceBuilder.query(QueryBuilders.multiMatchQuery(query, field1, field2)); searchSourceBuilder.query(QueryBuilders.boolQuery() .should(QueryBuilders.matchQuery(field1, query)) .must(QueryBuilders.matchQuery(field2, query))); searchSourceBuilder.fetchSource(include, exclude); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse \= null; try { searchResponse \= client.search(searchRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } String scrollId \= searchResponse.getScrollId(); org.elasticsearch.search.SearchHit\[\] searchHits \= searchResponse.getHits().getHits(); while (searchHits != null && searchHits.length > 0) { for(org.elasticsearch.search.SearchHit hit: searchHits) { //String highlightText = hit.getHighlightFields().get(field2).getFragments()\[0\].toString(); result.add(hit); } SearchScrollRequest searchScrollRequest \= new SearchScrollRequest(scrollId); searchScrollRequest.scroll(scroll); try { searchResponse \= client.scroll(searchScrollRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } scrollId \= searchResponse.getScrollId(); searchHits \= searchResponse.getHits().getHits(); } if(null != scrollId) { ClearScrollRequest clearScrollRequest \= new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollId); // 滚动完成后清除滚动上下文 ClearScrollResponse clearScrollResponse = null; try { clearScrollResponse \= client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } //清除滚动是否成功 boolean succeeded = clearScrollResponse.isSucceeded(); } return result; } 7.2利用after /\*\* \* 全量搜索 \* @param client \* @param index \* @param field1 \* @param field2 \* @param query \* @param size \* @param include \* @param exclude \* @return \*/ public List<org.elasticsearch.search.SearchHit> searchByQuerySearchAfter(RestHighLevelClient client, String index, String field1, String field2, String query, int size, String\[\] include, String\[\] exclude) { List<org.elasticsearch.search.SearchHit> result = CollectionUtil.newArrayList(); SearchRequest request \= new SearchRequest(index); SearchSourceBuilder searchSourceBuilder \= new SearchSourceBuilder(); // 高亮显示 HighlightBuilder highlightBuilder = new HighlightBuilder(); // 高亮标签 highlightBuilder.preTags("<a style='color: #e4393c'>"); highlightBuilder.postTags("</a>"); // 高亮字段 highlightBuilder.field(field2); QueryBuilder queryBuilder \= QueryBuilders.boolQuery() .should(QueryBuilders.matchQuery(field1, query)) .must(QueryBuilders.matchQuery(field2, query)); // searchSourceBuilder.query(QueryBuilders.matchQuery(field, query)); searchSourceBuilder.query(queryBuilder); searchSourceBuilder.fetchSource(include, exclude); //每页显示条数 searchSourceBuilder.size(size); // 需要唯一不重复的字段作为排序 searchSourceBuilder.sort("\_id", SortOrder.DESC); //searchSourceBuilder.sort("\_score", SortOrder.DESC); //score相同,则按时间降序排序 //searchSourceBuilder.sort("publish\_date", SortOrder.DESC); //高亮显示添加到构造器(不需要高亮显示则不添加) searchSourceBuilder.highlighter(highlightBuilder); //构造器添加到搜索请求 request.source(searchSourceBuilder); //客户端返回 SearchResponse response = null; try { response \= client.search(request, RequestOptions.DEFAULT); //搜索结果 org.elasticsearch.search.SearchHit\[\] hits = response.getHits().getHits(); while(hits.length > 0) { for(org.elasticsearch.search.SearchHit hit : hits) { result.add(hit); } org.elasticsearch.search.SearchHit last \= hits\[hits.length - 1\]; searchSourceBuilder.searchAfter(last.getSortValues()); response \= client.search(request, RequestOptions.DEFAULT); hits \= response.getHits().getHits(); } } catch (IOException e) { e.printStackTrace(); } return result; } 8.统计分析 /\*\* \* 多field统计 \* @param client \* @param index \* @param query \* @param field1 \* @param field2 \* @param aggFields \*/ public static Map<String, Map<String, Long>> countDocByTermsAgg(RestHighLevelClient client, String index, String query, String field1, String field2, String ... aggFields) { SearchSourceBuilder searchSourceBuilder \= new SearchSourceBuilder(); searchSourceBuilder.fetchSource(false); SearchRequest request \= new SearchRequest(index); QueryBuilder queryBuilder \= QueryBuilders.boolQuery() .should(QueryBuilders.matchQuery(field1, query)) .must(QueryBuilders.matchQuery(field2, query)); searchSourceBuilder.query(queryBuilder); Map<String, Map<String, Long>> fieldAggMap = CollectionUtil.newLinkedHashMap(); TermsAggregationBuilder aggregationBuilder; SearchResponse response \= null; for(String fieldName : aggFields) { aggregationBuilder \= AggregationBuilders.terms("agg\_name").field(fieldName); searchSourceBuilder.aggregation(aggregationBuilder); request.source(searchSourceBuilder); searchSourceBuilder.size(0); try { response \= client.search(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } Aggregations aggregations \= response.getAggregations(); Terms byTopicAggregation \= aggregations.get("agg\_name"); List<? extends Terms.Bucket> buckets = byTopicAggregation.getBuckets(); Map<String, Long> bucketsFieldsAgg = CollectionUtil.newLinkedHashMap(); buckets.forEach(b \-> bucketsFieldsAgg.put(b.getKeyAsString(), b.getDocCount()) ); fieldAggMap.put(fieldName, bucketsFieldsAgg); } return fieldAggMap; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。