赞
踩
在初始化查询阶段(query phase),查询被向索引中的每个分片副本(原本或副本)广播。每个分片在本地执行搜索并且建 立了匹配 document 的优先队列(priority queue)。
优先队列:一个优先队列(priority queue)只是一个存有前n个(top-n)匹配document的有序列表。这个优先队列的大小由分页参数 from + size 决定。

查询阶段
当一个搜索请求被发送到一个节点Node,这个节点就变成了协调节点。这个节点的工作是向所有相关的分片广播搜索请求并且把它们的响应整合成一个全局的有序结果集。对于后续请求,协调节点会轮询所有 的分片副本以分摊负载。每一个分片在本地执行查询和建立一个长度为 from+size 的有序优先队列——这个长度意味着它自己的结果数量就足够满 全局的请求要求。分片返回一个轻量级的结果列表给协调节点。只包含documentID值和排序需要用到的值,例如 _score 。
查询阶段辨别出那些满足搜索请求的document,但我们仍然需要取回那些document本身。这就是取回阶段的工作,如图分布式搜索的取回阶段所示。

取回阶段
协调节点为每个持有相关document的分片建立多点get请求然后发送请求到处理查询阶段的分片副本。 一旦协调节点收到所有结果,会将它们汇集到单一的回答响应里,这个响应将会返回给客户端。
我们看一下分布式存储系统中分页查询的过程:
可以看到,在分布式系统中,对结果排序的成本随分页的深度成指数上升。这就是 web 搜索引擎对任何查询都不要返回超过 10000 个结果的原因。
为了限制 from + size 分页的深度,ElasticSearch 的分页窗口默认最多允许 10000 条数据,即 在每页 20 条数据的情况,最多可以分 500 页,超过后报错
ES 支持的三种分页查询方式
说明: 官方已经不再推荐采用Scroll API进行深度分页。如果遇到超过10000的深度分页,推荐用search_after + PIT
index:shopping
@Test
public void saveAll(){
List<Product> productList = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
Product product = new Product();
product.setId(Long.valueOf(i));
product.setTitle("["+i+"]小米手机");
product.setCategory("手机");
product.setPrice(1999.0 + i);
product.setImages("http://www.test/xm.jpg");
productList.add(product);
}
productDao.saveAll(productList);
}
通过 from和 size是 ElasticSearch 最常用的分页方式,可以类比 MySQL 的 LIMIT start,limit
from:未指定,默认值是 0,注意不是1,代表当前页返回数据的起始值。
size: 未指定,默认值是 10,代表当前页返回数据的条数。
POST shopping/_search
{
"from": 0,
"size": 10,
"query": {
"match_all": {}
},
"sort": [
{"id": "asc"}
]
}
Java Client
/** * 分页查询 FROM + SIZE 查询 * @param currentPage 当前页,第一页从 0 开始, 1 表示第二页 * @param pageSize 每页显示多少条 */ public void findByPageable(int currentPage,int pageSize){ //设置排序(排序方式,正序还是倒序,排序的 id) Sort sort = Sort.by(Sort.Direction.ASC,"id"); //设置查询分页 PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort); //分页查询 Page<Product> productPage = productDao.findAll(pageRequest); for (Product Product : productPage.getContent()) { System.out.println(Product); } }
测试分页窗口限制
// 这是ElasticSearch最简单的分页查询,但以上命令是会报错的。 POST shopping/_search { "from": 10000, "size": 10, "query": { "match_all": {} }, "sort": [ {"id": "asc"} ] } // 因为size + from > 10000 所有导致报错 "error" : { "root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "Result window is too large, from + size must be less than or equal to: [10000] but was [10010]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting." } ], "type" : "search_phase_execution_exception", }
怎么解决这个问题,首先能想到的就是调大这个window。
PUT user/_settings
{
"index" : {
"max_result_window" : 20000
}
}
然后这种方式只能暂时解决问题,当es 的使用越来越多,数据量越来越大,深度分页的场景越来越复杂时,如何解决这种问题呢?
官方建议:
避免过度使用 from 和 size 来分页或一次请求太多结果。
不推荐使用 from + size 做深度分页查询的核心原因:
使用search_after 进行分页 相比 from & size 的方式要更加高效,而且在不断有新数据入库的时候仅仅使用 from 和 size 分页会有重复的情况,相比使用 scroll 分页,search_after 可以进行实时的查询,不过 search_after 不适合跳跃式的分页。
使用 search_after 类比 SQL,相当于
# 查询shopping num > 0 的前5条数据 并且不会走索引
SELECT * FROM shopping WHERE ORDER BY num > 0 ASC LIMIT 10
# 优化:获取返回列表中的最后一个 num,即 最大的 num,定为 {before_max_num}
SELECT * FROM shopping WHERE num > {before_max_num} ORDER BY num ASC LIMIT 5
但是 search_after 参数使用上一页中的一组排序值来检索下一页的数据。(增加一个条件查询 排序值 > 上一页排序值 )使用 search_after 需要具有相同查询和排序值的多个搜索请求。 如果在这些请求之间发生刷新,结果的顺序可能会发生变化,从而导致跨页面的结果不一致。 为防止出现这种情况,您可以创建一个时间点 (PIT) 以保留搜索中的当前索引状态。
时间点 Point In Time(PIT)保障搜索过程中保留特定事件点的索引状态。
注意⚠️:
es 给出了 search_after 的方式,这是在 >= 5.0 版本才提供的功能。
Point In Time(PIT)是 Elasticsearch 7.10 版本之后才有的新特性。
PIT的本质:存储索引数据状态的轻量级视图。
如下示例能很好的解读 PIT 视图的内涵。
// 1.给索引user_index创建 pit POST /shopping/_pit?keep_alive=5m // 2. 统计当前记录数 5 POST /shopping/_count // 3. 根据pit统计当前记录数 5 GET /_search { "query": { "match_all": {} }, "pit": { "id": "i6-xAwEKdXNlcl9pbmRleBZYTXdtSFRHeVJrZVhCby1OTjlHMS1nABZ0TEpMcVRuNFRxaWI4cXFTVERhOHR3AAAAAAAAIODBFmdBWEd2UmFVVGllZldNdnhPZDJmX0EBFlhNd21IVEd5UmtlWEJvLU5OOUcxLWcAAA==", "keep_alive": "5m" }, "sort": [ {"id": "asc"} ] } // 4. 插入一条数据 POST shopping/_bulk { "create": { "_id": "6" }} { "id":6,"name":"奶瓶"} // 5. 数据总量 6 POST /shopping/_count // 6. 根据pit统计数据总量还是 5 ,说明是根据时间点的视图进行统计。 GET /_search { "query": { "match_all": {} }, "pit": { "id": "i6-xAwEKdXNlcl9pbmRleBZYTXdtSFRHeVJrZVhCby1OTjlHMS1nABZ0TEpMcVRuNFRxaWI4cXFTVERhOHR3AAAAAAAAIODBFmdBWEd2UmFVVGllZldNdnhPZDJmX0EBFlhNd21IVEd5UmtlWEJvLU5OOUcxLWcAAA==", "keep_alive": "5m" }, "sort": [ {"id": "asc"} ] }
有了 PIT,search_after 的后续查询都是基于 PIT 视图进行,能有效保障数据的一致性。
//1. 获取索引的pit POST /shopping/_pit?keep_alive=5m //2. 根据 pit 首次查询 根据 pit 查询的时候,不用指定索引名称。 GET /_search { "size": 1, "from": 0,//注意from要从0开始 "query": { "match_all": {} }, "pit": { "id": "i6-xAwEKdXNlcl9pbmRleBZYTXdtSFRHeVJrZVhCby1OTjlHMS1nABZ0TEpMcVRuNFRxaWI4cXFTVERhOHR3AAAAAAAAIODBFmdBWEd2UmFVVGllZldNdnhPZDJmX0EBFlhNd21IVEd5UmtlWEJvLU5OOUcxLWcAAA==", "keep_alive": "1m" }, "sort": [ {"id": "asc"} ] } //查询结果 "hits" : [ { "_index" : "shopping", "_type" : "_doc", "_id" : "0", "_score" : null, "_source" : { "_class" : "com.caffee.es.model.Product", "id" : 0, "title" : "[0]小米手机", "category" : "手机", "price" : 1999.0, "images" : "http://www.test/xm.jpg" }, "sort" : [ 0, 4294967296 ] } ] //3. 根据search_after和pit进行翻页查询: search_after指定为上一次查询返回的sort值。 GET /_search { "size": 1, "query": { "match_all": {} }, "pit": { "id": "i6-xAwEKdXNlcl9pbmRleBZYTXdtSFRHeVJrZVhCby1OTjlHMS1nABZ0TEpMcVRuNFRxaWI4cXFTVERhOHR3AAAAAAAAIOJ7FmdBWEd2UmFVVGllZldNdnhPZDJmX0EBFlhNd21IVEd5UmtlWEJvLU5OOUcxLWcAAA==", "keep_alive": "5m" }, "sort": [ {"id": "asc"} ], "search_after": [ 0 ] } //查询结果 "hits" : [ { "_index" : "shopping", "_type" : "_doc", "_id" : "1", "_score" : null, "_source" : { "_class" : "com.caffee.es.model.Product", "id" : 1, "title" : "[1]小米手机", "category" : "手机", "price" : 2000.0, "images" : "http://www.test/xm.jpg" }, "sort" : [ 1 ] } ]
- 带有 pit 参数的搜索请求不得指定 index、routing 和 preference,因为这些参数是从时间点复制的。
- id 参数告诉 Elasticsearch 从这个时间点使用上下文执行请求。
- keep_alive 参数告诉 Elasticsearch 应该将时间点的生存时间延长多长时间。
Java Client
/** * 分页查询 search_after + SIZE + PIT 查询 * @param indices 索引名用于创建PIT * @param sortNum 排序值 * @param pageSize 页数 * @return * @throws Exception */ public Integer findByPageableBySearchAfterPIT(String indices,int sortNum,int pageSize) throws Exception { // 1. 创建时间点,过期时间5分钟 String pitId = createPit(indices,5); // 2.结合 search after 和 PIT ID 进行深度分页 final PointInTimeBuilder pitBuilder = new PointInTimeBuilder(pitId); // 3.创建搜索条件 final SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource() .pointInTimeBuilder(pitBuilder) // 指定 pit .from(0) .size(pageSize) .searchAfter(new Object[]{sortNum}) .sort("id", SortOrder.ASC); SearchRequest searchRequest = new SearchRequest();//indices 无需指定索引名 searchRequest.source(searchSourceBuilder); //4. 获取结果 SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); Object[] arrays = new Object[1]; System.out.println(search.getHits().getHits()); for(SearchHit hit : search.getHits().getHits()){ Map<String, Object> map = hit.getSourceAsMap(); System.out.println(JSONObject.toJSONString(map)); System.out.println(hit.getSortValues()[0]); arrays = hit.getSortValues(); } System.out.println("sort值" + arrays[0]); // 最后关闭 Point In Time final ClosePointInTimeRequest closePointInTimeRequest = new ClosePointInTimeRequest(pitId); restHighLevelClient.closePointInTime(closePointInTimeRequest,RequestOptions.DEFAULT); if(arrays[0] == null) return null; else return Integer.parseInt(arrays[0].toString()); } /** * 创建 PIT OpenPointInTimeRequest支持版本:highlevelclient7.16以上 * @param indices 索引名 * @param keep_alive 存活时间 单位:分钟 * @throws Exception */ private String createPit(String indices,int keep_alive) throws Exception{ // 构造 pit open Request //1. 根据索引创建时间点 final OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(indices); //2. 设置存活时间 pitRequest.keepAlive(TimeValue.timeValueMinutes(keep_alive)); //打开 pit 获取 pitId final OpenPointInTimeResponse pitResponse = restHighLevelClient.openPointInTime(pitRequest, RequestOptions.DEFAULT); //3. 读取返回的时间点 id final String pitId = pitResponse.getPointInTimeId(); return pitId; }
思考 本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。