Elasticsearch Scroll API实现高效批量数据遍历

小豆丁 19小时前 ⋅ 6 阅读
聊天记录
头像

int pageSize = 300; int dataCount = 0; int successCount = 0; int noWorkerIdCount = 0; SearchRequest searchRequest = SearchRequest.of(r -> r .index(EsProhibitionPushData.INDEX_NAME) .scroll(Time.of(t -> t.time("5m"))) .sort(s -> s.field(f -> f.field("_id").order(SortOrder.Asc))) .size(pageSize)); 解释一下

头像

这段代码构建了一个 Elasticsearch(ES)搜索请求,主要用于批量、高效地遍历索引数据(特别是数据量较大时)。以下是各部分的详细解释:


1. 变量定义

JAVA
int pageSize = 300;            // 每批次从 ES 获取的数据量
int dataCount = 0;             // 统计已处理的数据总数
int successCount = 0;          // 成功处理的数据条数
int noWorkerIdCount = 0;       // 统计缺失 workerId 的数据条数

这些变量用于分页控制结果统计


2. SearchRequest 构建

通过 Elasticsearch Java Client 的 SearchRequest.of() 方法配置查询:

JAVA
SearchRequest searchRequest = SearchRequest.of(r -> r
    .index(EsProhibitionPushData.INDEX_NAME)   // 指定查询的索引名称
    .scroll(Time.of(t -> t.time("5m")))        // 启用 Scroll 游标查询,保持上下文5分钟
    .sort(s -> s.field(f -> f.field("_id").order(SortOrder.Asc))) // 按 _id 升序排序
    .size(pageSize));                          // 每批次返回 300 条文档

关键参数解析:

参数 说明
index() 指定要查询的 ES 索引名称,EsProhibitionPushData.INDEX_NAME 是索引名的常量。
scroll() 启用 Scroll API,用于深度分页或批量导出大量数据。"5m" 表示游标保持有效时间为5分钟。
sort() _id 字段升序排序,确保遍历顺序一致,适合顺序处理。
size() 每批次返回的文档数(这里设为 pageSize=300)。

3. 应用场景

这种配置通常用于:

  • 数据迁移/导出:需要安全遍历大量数据而不影响集群性能。
  • 批量处理:逐批获取数据并进行业务操作(如更新、统计、校验等)。
  • 替代传统分页:避免 from/size 深度分页的性能问题。

4. 后续操作提示

构建 SearchRequest 后,通常会:

  1. 发送请求获取第一批数据及 scroll_id
  2. 循环使用 scroll_id 获取后续批次,直到数据全部取完。
  3. 每批处理完成后,更新 dataCountsuccessCount 等统计变量。

如果有进一步的使用场景或问题,可以继续补充说明。

头像

scroll() 启用 Scroll API,用于深度分页或批量导出大量数据。"5m"表示游标保持有效时间为5分钟。 一次返回多少数据?

头像

一次返回的数据量由 .size(pageSize) 决定,这里 pageSize = 300,所以每次 Scroll 请求返回 300 条文档


详细说明:

配置项 作用 示例值
.size(pageSize) 控制单次响应中包含的文档数量 300
.scroll("5m") 设置 Scroll 上下文存活时间,用于保持搜索状态,以便获取后续批次 5分钟

工作流程:

  1. 首次搜索:发送这个 SearchRequest,ES 会返回:
    • 第一批 300 条数据(由 .size(300) 指定)
    • 一个 scroll_id(用于获取下一批)
  2. 后续滚动:使用 scroll_id 请求下一批,每次继续返回 300 条,直到数据全部取完。
  3. 上下文过期:如果两次请求间隔超过 5 分钟,Scroll 上下文自动失效,未取完的数据将无法继续获取。

类比理解:

行为 传统分页 Scroll 分页
单次获取量 from=0, size=300 第1页300条 固定 size=300,每批300条
翻页方式 修改 from 值(如 from=300 使用 scroll_id 获取下一批
适用场景 浅分页(如用户界面翻页) 深度遍历/批量处理(如导出全部数据)

注意事项:

  • 性能:Scroll 会保存搜索时的快照,占用 ES 资源,不建议用于实时用户请求,适合后台批处理。
  • 超时"5m"滚动上下文保持时间,如果处理太慢导致超时,需要续期或加快处理速度。

如果需要调整每次返回的数据量,只需修改 pageSize 变量即可。

头像

scroll_id获取下一批 ,排序还会生效吗?数据会重复吗?

头像

排序会继续生效,且数据不会重复。


核心机制解释:

特性 说明
排序保持 首次请求的排序规则(这里按 _id 升序)会在整个 Scroll 会话中持续生效。后续批次会严格按照此顺序继续返回。
无重复 ES 内部维护一个游标指针,每次返回下一批数据后指针后移,不会重复返回已取过的数据。
快照视图 Scroll 启动时会创建数据快照,后续滚动都基于此快照,即使期间有数据写入/修改,也不会影响已返回的顺序和内容。

示例流程(假设 1000 条数据,pageSize=300):

  1. 首次请求:返回 _id 最小的 300 条(如 id_001id_300),同时获得 scroll_id
  2. 第二次滚动:使用 scroll_id 请求,返回接下来 300 条(id_301id_600)。
  3. 第三次滚动:返回 id_601id_900
  4. 第四次滚动:返回剩余 100 条(id_901id_1000),再下次返回空数组表示结束。

注意事项:

场景 影响
数据更新 在 Scroll 过程中,新写入的数据不会出现在结果中(因为基于快照)。
数据修改/删除 已删除的文档在后续批次中可能以空值占位(取决于 ES 版本),但不会导致顺序错乱或重复。
超时重试 如果 Scroll 上下文过期(超过5分钟),需要重新发起初始请求,从头开始遍历。

类比理解:

想象一本固定页码的书(快照):

  • 第一次翻到第1-300页(排序已定)。
  • 下次直接从第301页继续,不会回到前面,也不会跳过页面。

总结:Scroll 设计就是为了安全、有序、无重复地遍历大量数据,无需担心排序失效或重复问题。

头像

不建议用于实时用户请求 ?

头像

是的,强烈不建议将 Scroll API 用于实时用户请求(如网站或App的前端分页查询)。它专为后台批处理设计,用于实时请求会产生严重问题。


主要原因:

问题 说明
1. 数据非实时 Scroll 基于搜索初始时刻的快照。用户翻页过程中,即使有新数据写入或被修改,他也看不到,得到的是“过时”结果。这违背了实时查询的预期。
2. 资源消耗大 每个 Scroll 会话都会在 ES 集群中占用大量内存和文件句柄来维持上下文。大量用户同时发起实时请求,会瞬间拖垮集群。
3. 超时与状态管理复杂 需要维护 scroll_id 并严格控制生存时间。在用户不可控的网络环境下,连接极易超时,导致翻页中断,体验极差。

正确工具选择:

根据您的场景,应使用不同的 ES 分页功能:

场景 推荐方案 简要说明
用户界面实时分页<br>(如“第1页/共N页”) from + size 适用于浅分页(如 from=0, size=20)。深度分页(如第1000页)性能会急剧下降,因为 from 值过大会导致全局排序和跳过成本极高。
用户界面深度浏览<br>(如无限滚动、连续读取) search_after 当前最佳实践。配合排序字段(如时间戳和唯一ID),提供稳定的实时游标。性能高,且能反映查询时刻的最新数据。
后台批量处理/全量导出<br>(如数据分析、数据同步) Scroll APIPoint In Time (PIT) + search_after 这正是您代码中的场景。适合在后台安静地处理大量数据,不追求实时性,但要求稳定、完整地遍历。

举例说明:

错误用法(用户翻页):

JAVA
// 用户点击“下一页”时
SearchRequest request = SearchRequest.of(r -> r
    .index("products")
    .scroll("1m") // ❌ 为每个用户请求创建 Scroll,集群很快崩溃
    .size(20)
);

正确用法(用户翻页):

JAVA
// 使用 search_after 实现
SearchRequest request = SearchRequest.of(r -> r
    .index("products")
    .sort(s -> s.field(f -> f.field("price").order(SortOrder.Asc)))
    .sort(s -> s.field(f -> f.field("_id").order(SortOrder.Asc))) // 确保顺序唯一
    .size(20)
    .search_after(List.of(lastPrice, lastId)) // 传入上一页最后一条的排序值
);

总结对比表:

特性 Scroll API from/size search_after
实时性 快照(非实时) 实时 实时
资源占用 很高(需维护上下文) 低(浅分页时)<br>高(深度分页时) 低(与 from 无关)
适用场景 后台批处理/全量导出 用户界面浅分页(如前10页) 用户界面深度浏览/无限滚动
数据一致性 搜索开始时的一致快照 每次查询时的最新数据 每次查询时的最新数据

结论:您的原始代码使用 Scroll API 是完全正确的,因为它显然是后台批处理任务(统计计数、遍历全量数据)。但请务必将其与服务实时用户请求的代码区分开,后者应选择 search_after 或浅分页。

全部评论: 0

    我有话说: