int pageSize = 300; int dataCount = 0; int successCount = 0; int noWorkerIdCount = 0; SearchRequest searchRequest = SearchRequest.of(r -> r .index(EsProhibitionPushData.INDEX_NAME) .scroll(Time.of(t -> t.time("5m"))) .sort(s -> s.field(f -> f.field("_id").order(SortOrder.Asc))) .size(pageSize)); 解释一下
这段代码构建了一个 Elasticsearch(ES)搜索请求,主要用于批量、高效地遍历索引数据(特别是数据量较大时)。以下是各部分的详细解释:
1. 变量定义
int pageSize = 300; // 每批次从 ES 获取的数据量
int dataCount = 0; // 统计已处理的数据总数
int successCount = 0; // 成功处理的数据条数
int noWorkerIdCount = 0; // 统计缺失 workerId 的数据条数
这些变量用于分页控制和结果统计。
2. SearchRequest 构建
通过 Elasticsearch Java Client 的 SearchRequest.of() 方法配置查询:
SearchRequest searchRequest = SearchRequest.of(r -> r
.index(EsProhibitionPushData.INDEX_NAME) // 指定查询的索引名称
.scroll(Time.of(t -> t.time("5m"))) // 启用 Scroll 游标查询,保持上下文5分钟
.sort(s -> s.field(f -> f.field("_id").order(SortOrder.Asc))) // 按 _id 升序排序
.size(pageSize)); // 每批次返回 300 条文档
关键参数解析:
| 参数 | 说明 |
|---|---|
index() |
指定要查询的 ES 索引名称,EsProhibitionPushData.INDEX_NAME 是索引名的常量。 |
scroll() |
启用 Scroll API,用于深度分页或批量导出大量数据。"5m" 表示游标保持有效时间为5分钟。 |
sort() |
按 _id 字段升序排序,确保遍历顺序一致,适合顺序处理。 |
size() |
每批次返回的文档数(这里设为 pageSize=300)。 |
3. 应用场景
这种配置通常用于:
- 数据迁移/导出:需要安全遍历大量数据而不影响集群性能。
- 批量处理:逐批获取数据并进行业务操作(如更新、统计、校验等)。
- 替代传统分页:避免
from/size深度分页的性能问题。
4. 后续操作提示
构建 SearchRequest 后,通常会:
- 发送请求获取第一批数据及
scroll_id。 - 循环使用
scroll_id获取后续批次,直到数据全部取完。 - 每批处理完成后,更新
dataCount、successCount等统计变量。
如果有进一步的使用场景或问题,可以继续补充说明。
scroll() 启用 Scroll API,用于深度分页或批量导出大量数据。"5m"表示游标保持有效时间为5分钟。 一次返回多少数据?
一次返回的数据量由 .size(pageSize) 决定,这里 pageSize = 300,所以每次 Scroll 请求返回 300 条文档。
详细说明:
| 配置项 | 作用 | 示例值 |
|---|---|---|
.size(pageSize) |
控制单次响应中包含的文档数量 | 300 |
.scroll("5m") |
设置 Scroll 上下文存活时间,用于保持搜索状态,以便获取后续批次 | 5分钟 |
工作流程:
- 首次搜索:发送这个
SearchRequest,ES 会返回:- 第一批 300 条数据(由
.size(300)指定) - 一个
scroll_id(用于获取下一批)
- 第一批 300 条数据(由
- 后续滚动:使用
scroll_id请求下一批,每次继续返回 300 条,直到数据全部取完。 - 上下文过期:如果两次请求间隔超过 5 分钟,Scroll 上下文自动失效,未取完的数据将无法继续获取。
类比理解:
| 行为 | 传统分页 | Scroll 分页 |
|---|---|---|
| 单次获取量 | from=0, size=300 第1页300条 |
固定 size=300,每批300条 |
| 翻页方式 | 修改 from 值(如 from=300) |
使用 scroll_id 获取下一批 |
| 适用场景 | 浅分页(如用户界面翻页) | 深度遍历/批量处理(如导出全部数据) |
注意事项:
- 性能:Scroll 会保存搜索时的快照,占用 ES 资源,不建议用于实时用户请求,适合后台批处理。
- 超时:
"5m"是 滚动上下文保持时间,如果处理太慢导致超时,需要续期或加快处理速度。
如果需要调整每次返回的数据量,只需修改 pageSize 变量即可。
scroll_id获取下一批 ,排序还会生效吗?数据会重复吗?
排序会继续生效,且数据不会重复。
核心机制解释:
| 特性 | 说明 |
|---|---|
| 排序保持 | 首次请求的排序规则(这里按 _id 升序)会在整个 Scroll 会话中持续生效。后续批次会严格按照此顺序继续返回。 |
| 无重复 | ES 内部维护一个游标指针,每次返回下一批数据后指针后移,不会重复返回已取过的数据。 |
| 快照视图 | Scroll 启动时会创建数据快照,后续滚动都基于此快照,即使期间有数据写入/修改,也不会影响已返回的顺序和内容。 |
示例流程(假设 1000 条数据,pageSize=300):
- 首次请求:返回
_id最小的 300 条(如id_001到id_300),同时获得scroll_id。 - 第二次滚动:使用
scroll_id请求,返回接下来 300 条(id_301到id_600)。 - 第三次滚动:返回
id_601到id_900。 - 第四次滚动:返回剩余 100 条(
id_901到id_1000),再下次返回空数组表示结束。
注意事项:
| 场景 | 影响 |
|---|---|
| 数据更新 | 在 Scroll 过程中,新写入的数据不会出现在结果中(因为基于快照)。 |
| 数据修改/删除 | 已删除的文档在后续批次中可能以空值占位(取决于 ES 版本),但不会导致顺序错乱或重复。 |
| 超时重试 | 如果 Scroll 上下文过期(超过5分钟),需要重新发起初始请求,从头开始遍历。 |
类比理解:
想象一本固定页码的书(快照):
- 第一次翻到第1-300页(排序已定)。
- 下次直接从第301页继续,不会回到前面,也不会跳过页面。
总结:Scroll 设计就是为了安全、有序、无重复地遍历大量数据,无需担心排序失效或重复问题。
不建议用于实时用户请求 ?
是的,强烈不建议将 Scroll API 用于实时用户请求(如网站或App的前端分页查询)。它专为后台批处理设计,用于实时请求会产生严重问题。
主要原因:
| 问题 | 说明 |
|---|---|
| 1. 数据非实时 | Scroll 基于搜索初始时刻的快照。用户翻页过程中,即使有新数据写入或被修改,他也看不到,得到的是“过时”结果。这违背了实时查询的预期。 |
| 2. 资源消耗大 | 每个 Scroll 会话都会在 ES 集群中占用大量内存和文件句柄来维持上下文。大量用户同时发起实时请求,会瞬间拖垮集群。 |
| 3. 超时与状态管理复杂 | 需要维护 scroll_id 并严格控制生存时间。在用户不可控的网络环境下,连接极易超时,导致翻页中断,体验极差。 |
正确工具选择:
根据您的场景,应使用不同的 ES 分页功能:
| 场景 | 推荐方案 | 简要说明 |
|---|---|---|
| 用户界面实时分页<br>(如“第1页/共N页”) | from + size |
适用于浅分页(如 from=0, size=20)。深度分页(如第1000页)性能会急剧下降,因为 from 值过大会导致全局排序和跳过成本极高。 |
| 用户界面深度浏览<br>(如无限滚动、连续读取) | search_after |
当前最佳实践。配合排序字段(如时间戳和唯一ID),提供稳定的实时游标。性能高,且能反映查询时刻的最新数据。 |
| 后台批量处理/全量导出<br>(如数据分析、数据同步) | Scroll API 或 Point In Time (PIT) + search_after |
这正是您代码中的场景。适合在后台安静地处理大量数据,不追求实时性,但要求稳定、完整地遍历。 |
举例说明:
错误用法(用户翻页):
// 用户点击“下一页”时
SearchRequest request = SearchRequest.of(r -> r
.index("products")
.scroll("1m") // ❌ 为每个用户请求创建 Scroll,集群很快崩溃
.size(20)
);
正确用法(用户翻页):
// 使用 search_after 实现
SearchRequest request = SearchRequest.of(r -> r
.index("products")
.sort(s -> s.field(f -> f.field("price").order(SortOrder.Asc)))
.sort(s -> s.field(f -> f.field("_id").order(SortOrder.Asc))) // 确保顺序唯一
.size(20)
.search_after(List.of(lastPrice, lastId)) // 传入上一页最后一条的排序值
);
总结对比表:
| 特性 | Scroll API | from/size |
search_after |
|---|---|---|---|
| 实时性 | 快照(非实时) | 实时 | 实时 |
| 资源占用 | 很高(需维护上下文) | 低(浅分页时)<br>高(深度分页时) | 低(与 from 无关) |
| 适用场景 | 后台批处理/全量导出 | 用户界面浅分页(如前10页) | 用户界面深度浏览/无限滚动 |
| 数据一致性 | 搜索开始时的一致快照 | 每次查询时的最新数据 | 每次查询时的最新数据 |
结论:您的原始代码使用 Scroll API 是完全正确的,因为它显然是后台批处理任务(统计计数、遍历全量数据)。但请务必将其与服务实时用户请求的代码区分开,后者应选择 search_after 或浅分页。