MongoDB聚合管道操作详解

小豆丁 19小时前 ⋅ 7 阅读
聊天记录
头像

mongoTemplate.aggregate ?

头像

mongoTemplate.aggregateMongoDB 聚合管道操作的核心方法,用于执行复杂的数据分析和转换。让我详细解释:

1. 基本概念

JAVA
// 聚合管道就像数据处理流水线
Aggregation aggregation = Aggregation.newAggregation(
    // 第一阶段
    Aggregation.match(Criteria.where("status").is("active")),
    
    // 第二阶段  
    Aggregation.group("category").sum("amount").as("totalAmount"),
    
    // 第三阶段
    Aggregation.sort(Sort.Direction.DESC, "totalAmount")
);

// 执行聚合
AggregationResults<Document> results = mongoTemplate.aggregate(
    aggregation, 
    "orders",  // 集合名
    Document.class  // 返回类型
);

2. 核心方法签名

JAVA
// 主要方法签名
<T> AggregationResults<T> aggregate(
    Aggregation aggregation,          // 聚合管道定义
    String inputCollectionName,      // 输入集合名
    Class<T> outputType              // 输出类型
);

<T> AggregationResults<T> aggregate(
    Aggregation aggregation,          // 聚合管道定义
    Class<?> inputType,              // 输入实体类
    Class<T> outputType              // 输出类型
);

<T> AggregationResults<T> aggregate(
    TypedAggregation<?> aggregation,  // 类型化聚合
    Class<T> outputType              // 输出类型
);

3. 完整的聚合阶段

3.1 所有聚合阶段

JAVA
// Spring Data MongoDB 支持的聚合阶段
Aggregation.match()        // 过滤文档
Aggregation.group()        // 分组统计
Aggregation.project()      // 字段投影/计算
Aggregation.sort()         // 排序
Aggregation.limit()        // 限制数量
Aggregation.skip()         // 跳过文档
Aggregation.unwind()       // 展开数组
Aggregation.facet()        // 多面聚合
Aggregation.lookup()       // 左连接
Aggregation.graphLookup()  // 图查找
Aggregation.addFields()    // 添加字段
Aggregation.replaceRoot()  // 替换根文档
Aggregation.count()        // 计数
Aggregation.bucket()       // 分组桶
Aggregation.bucketAuto()   // 自动分组桶
Aggregation.sortByCount()  // 按计数排序
Aggregation.sample()       // 随机抽样
Aggregation.unionWith()    // 合并集合
Aggregation.merge()        // 合并到集合

4. 详细示例

4.1 基础聚合示例

JAVA
// 示例1:简单的分组统计
public List<CategoryTotal> getCategoryTotals() {
    Aggregation aggregation = Aggregation.newAggregation(
        // 1. 过滤条件
        Aggregation.match(
            Criteria.where("status").is("completed")
                   .and("amount").gte(100)
        ),
        
        // 2. 按类别分组,计算总额
        Aggregation.group("category")
            .sum("amount").as("totalAmount")
            .avg("amount").as("avgAmount")
            .count().as("orderCount")
            .first("category").as("categoryName"),
        
        // 3. 投影:选择需要的字段
        Aggregation.project("categoryName", "totalAmount", "avgAmount", "orderCount")
            .andExclude("_id"),
        
        // 4. 按总额降序排序
        Aggregation.sort(Sort.Direction.DESC, "totalAmount"),
        
        // 5. 只取前10个
        Aggregation.limit(10)
    );
    
    return mongoTemplate.aggregate(
        aggregation, 
        "orders", 
        CategoryTotal.class
    ).getMappedResults();
}

// 返回的 DTO
public class CategoryTotal {
    private String categoryName;
    private BigDecimal totalAmount;
    private BigDecimal avgAmount;
    private Integer orderCount;
    // getter/setter
}

4.2 复杂聚合示例

JAVA
// 示例2:多阶段复杂聚合
public List<CustomerReport> getCustomerReport(Date startDate, Date endDate) {
    Aggregation aggregation = Aggregation.newAggregation(
        // 阶段1: 过滤
        Aggregation.match(
            Criteria.where("orderDate")
                .gte(startDate)
                .lte(endDate)
                .and("status").in("completed", "shipped")
        ),
        
        // 阶段2: 展开订单项数组
        Aggregation.unwind("items"),
        
        // 阶段3: 按客户和产品分组
        Aggregation.group("customerId", "items.productId")
            .sum("items.quantity").as("totalQuantity")
            .sum("items.subtotal").as("totalSales")
            .addToSet("items.productName").as("productNames"),
        
        // 阶段4: 连接客户信息
        Aggregation.lookup(
            "customers",        // 连接的目标集合
            "_id.customerId",   // 本地字段
            "_id",              // 目标集合字段
            "customerInfo"      // 输出字段名
        ),
        
        // 阶段5: 展开客户信息
        Aggregation.unwind("customerInfo", true),  // preserveNullAndEmptyArrays = true
        
        // 阶段6: 添加计算字段
        Aggregation.addFields()
            .addField("unitPrice")
            .withValue(ArithmeticOperators.Divide
                .valueOf("totalSales")
                .divideBy("totalQuantity"))
            .build(),
        
        // 阶段7: 最终投影
        Aggregation.project()
            .and("_id.customerId").as("customerId")
            .and("customerInfo.name").as("customerName")
            .and("_id.productId").as("productId")
            .and("productNames").arrayElementAt(0).as("productName")
            .and("totalQuantity").as("quantity")
            .and("totalSales").as("sales")
            .and("unitPrice").as("unitPrice")
            .andExclude("_id"),
        
        // 阶段8: 排序
        Aggregation.sort(Sort.by("customerName", "productName").ascending())
    );
    
    return mongoTemplate.aggregate(
        aggregation, 
        "orders", 
        CustomerReport.class
    ).getMappedResults();
}

5. 聚合表达式

5.1 算术表达式

JAVA
// 使用算术表达式
Aggregation aggregation = Aggregation.newAggregation(
    Aggregation.project()
        .and("price").as("originalPrice")
        .and(ArithmeticOperators.Multiply
            .valueOf("price")
            .multiplyBy(0.8))  // 打8折
            .as("discountedPrice")
        .and(ArithmeticOperators.Subtract
            .valueOf("price")
            .subtract("cost"))
            .as("profitMargin")
        .and.as("totalPrice")  // 使用表达式字符串
);

5.2 字符串表达式

JAVA
// 字符串操作
Aggregation.project()
    .and(StringOperators.Concat
        .valueOf("firstName")
        .concat(" ")
        .concatValueOf("lastName"))
        .as("fullName")
    .and(StringOperators.Substr
        .valueOf("email")
        .substring(0, 5))
        .as("emailPrefix")
    .and(StringOperators.ToLower
        .valueOf("city"))
        .as("cityLowerCase");

5.3 日期表达式

JAVA
// 日期操作
Aggregation.project()
    .and(DateOperators.DateToString
        .dateOf("orderDate")
        .toString("%Y-%m-%d"))
        .as("orderDateString")
    .and(DateOperators.Year
        .yearOf("orderDate"))
        .as("orderYear")
    .and(DateOperators.DayOfMonth
        .dayOfMonth("orderDate"))
        .as("orderDay");

6. 高级特性

6.1 Facet 多面聚合

JAVA
// 同一数据集的不同维度聚合
Aggregation aggregation = Aggregation.newAggregation(
    Aggregation.facet()
        .and(
            Aggregation.match(Criteria.where("status").is("active")),
            Aggregation.count().as("activeCount")
        ).as("activeStats")
        .and(
            Aggregation.match(Criteria.where("amount").gte(1000)),
            Aggregation.group("category").count().as("categoryCount"),
            Aggregation.sort(Sort.Direction.DESC, "categoryCount")
        ).as("bigOrdersByCategory")
        .and(
            Aggregation.project()
                .and").as("year")
                .and").as("month"),
            Aggregation.group("year", "month")
                .sum("amount").as("monthlyTotal")
        ).as("monthlyTotals")
);

AggregationResults<Document> results = mongoTemplate.aggregate(
    aggregation, 
    "orders", 
    Document.class
);

6.2 Graph Lookup 图查找

JAVA
// 查询组织结构
Aggregation aggregation = Aggregation.newAggregation(
    Aggregation.match(Criteria.where("employeeId").is("123")),
    
    Aggregation.graphLookup("employees")
        .startWith("$managerId")
        .connectFrom("managerId")
        .connectTo("employeeId")
        .as("managementChain")
        .maxDepth(5)  // 最大深度
        .depthField("level")
);

// 返回指定员工的所有上级
List<Document> managementChain = results.getMappedResults();

6.3 Bucket 分组桶

JAVA
// 将数据分组到指定范围
Aggregation aggregation = Aggregation.newAggregation(
    Aggregation.bucket("amount")
        .withBoundaries(0, 100, 500, 1000, 5000)  // 价格区间
        .withDefaultBucket("above_5000")           // 默认桶
        .andOutputCount().as("count")
        .andOutput("amount").sum().as("totalAmount")
        .andOutput("amount").avg().as("avgAmount")
);

7. 性能优化

7.1 索引优化

JAVA
// 确保聚合查询使用索引
@CompoundIndex(name = "status_date_idx", def = "{'status': 1, 'orderDate': -1}")
@Document(collection = "orders")
public class Order {
    // ...
}

// 查询时使用索引字段
Aggregation aggregation = Aggregation.newAggregation(
    Aggregation.match(
        Criteria.where("status").is("completed")
               .and("orderDate").gte(startDate)  // 使用复合索引字段
    ),
    // ...
);

7.2 分页聚合

JAVA
// 聚合结果分页
public AggregationPage<OrderSummary> getOrderSummaryPage(int page, int size) {
    Aggregation aggregation = Aggregation.newAggregation(
        Aggregation.match(Criteria.where("status").is("completed")),
        Aggregation.group("category")
            .sum("amount").as("totalAmount")
            .count().as("orderCount"),
        Aggregation.sort(Sort.Direction.DESC, "totalAmount"),
        Aggregation.skip((long) page * size),  // 分页
        Aggregation.limit(size)
    );
    
    AggregationResults<OrderSummary> results = mongoTemplate.aggregate(
        aggregation, 
        "orders", 
        OrderSummary.class
    );
    
    // 获取总数
    long total = mongoTemplate.count(
        Query.query(Criteria.where("status").is("completed")), 
        "orders"
    );
    
    return new AggregationPage<>(
        results.getMappedResults(),
        PageRequest.of(page, size),
        total
    );
}

8. 聚合结果处理

8.1 多种返回类型

JAVA
// 1. 返回 Document
AggregationResults<Document> docResults = mongoTemplate.aggregate(
    aggregation, "orders", Document.class
);
for (Document doc : docResults) {
    System.out.println(doc.toJson());
}

// 2. 返回自定义 DTO
AggregationResults<OrderStats> dtoResults = mongoTemplate.aggregate(
    aggregation, "orders", OrderStats.class
);
List<OrderStats> stats = dtoResults.getMappedResults();

// 3. 返回 Map
AggregationResults<Map> mapResults = mongoTemplate.aggregate(
    aggregation, "orders", Map.class
);

// 4. 返回原始输出
AggregationOutput rawOutput = mongoTemplate.aggregate(
    aggregation, "orders", BasicDBObject.class
);

8.2 处理大数据集

JAVA
// 使用游标处理大量数据
public void processLargeAggregation() {
    Aggregation aggregation = Aggregation.newAggregation(
        Aggregation.match(Criteria.where("status").is("active")),
        Aggregation.group("category").sum("amount").as("total")
    );
    
    // 使用 cursor
    aggregation.withOptions(
        AggregationOptions.builder()
            .cursor(new Document("batchSize", 1000))  // 批次大小
            .allowDiskUse(true)  // 允许使用磁盘
            .build()
    );
    
    try (CloseableIterator<Document> iterator = mongoTemplate
            .aggregateStream(aggregation, "orders", Document.class)) {
        while (iterator.hasNext()) {
            Document doc = iterator.next();
            processDocument(doc);
        }
    }
}

9. 常见聚合模式

9.1 统计报表

JAVA
public SalesReport getSalesReport(DateRange range) {
    Aggregation aggregation = Aggregation.newAggregation(
        Aggregation.match(
            Criteria.where("orderDate")
                .gte(range.getStartDate())
                .lte(range.getEndDate())
        ),
        
        // 按日期统计
        Aggregation.facet()
            .and(
                Aggregation.project()
                    .and(DateOperators.DateToString
                        .dateOf("orderDate")
                        .toString("%Y-%m-%d")).as("date"),
                Aggregation.group("date")
                    .sum("amount").as("dailyTotal")
                    .count().as("orderCount"),
                Aggregation.sort(Sort.by("date").ascending())
            ).as("dailySales")
            
            // 按产品统计
            .and(
                Aggregation.unwind("items"),
                Aggregation.group("items.productId", "items.productName")
                    .sum("items.quantity").as("totalQuantity")
                    .sum("items.subtotal").as("totalSales"),
                Aggregation.sort(Sort.by("totalSales").descending()),
                Aggregation.limit(10)
            ).as("topProducts")
            
            // 按地区统计
            .and(
                Aggregation.lookup("customers", "customerId", "_id", "customer"),
                Aggregation.unwind("customer"),
                Aggregation.group("customer.region")
                    .sum("amount").as("regionTotal")
                    .count().as("orderCount"),
                Aggregation.sort(Sort.by("regionTotal").descending())
            ).as("salesByRegion")
    );
    
    AggregationResults<Document> results = mongoTemplate.aggregate(
        aggregation, 
        "orders", 
        Document.class
    );
    
    return mapToSalesReport(results.getUniqueMappedResult());
}

9.2 数据清洗

JAVA
public void cleanAndTransformData() {
    Aggregation aggregation = Aggregation.newAggregation(
        // 1. 过滤无效数据
        Aggregation.match(
            Criteria.where("amount").gt(0)
                   .and("customerId").exists(true)
        ),
        
        // 2. 标准化字段
        Aggregation.addFields()
            .addField("normalizedEmail")
            .withValue(StringOperators.ToLower.valueOf("email"))
            .build()
            .addField("category")
            .withValue(ConditionalOperators.ifNull("category").then("uncategorized"))
            .build(),
        
        // 3. 计算衍生字段
        Aggregation.addFields()
            .addField("profit")
            .withValue(ArithmeticOperators.Subtract
                .valueOf("amount")
                .subtract("cost"))
            .build()
            .addField("profitMargin")
            .withValue(ArithmeticOperators.Multiply
                .valueOf(
                    ArithmeticOperators.Divide
                        .valueOf("profit")
                        .divideBy("amount")
                )
                .multiplyBy(100))
            .build(),
        
        // 4. 写入新集合
        Aggregation.out("cleaned_orders")
    );
    
    mongoTemplate.aggregate(aggregation, "raw_orders", Document.class);
}

10. 调试和优化

10.1 查看生成的聚合管道

JAVA
// 打印聚合管道
Aggregation aggregation = ...;
String pipeline = aggregation.toString();
System.out.println("Aggregation Pipeline: " + pipeline);

// 或者获取 MongoDB 原生格式
List<Document> pipelineDoc = aggregation.toPipeline(Aggregation.DEFAULT_CONTEXT);
System.out.println("MongoDB Pipeline: " + pipelineDoc);

// 使用 explain
AggregationResults<Document> results = mongoTemplate.aggregate(
    aggregation.withOptions(
        AggregationOptions.builder()
            .explain(true)  // 返回执行计划
            .build()
    ),
    "orders",
    Document.class
);

10.2 监控聚合性能

JAVA
@Repository
public class OrderRepository {
    
    @Autowired
    private MongoTemplate mongoTemplate;
    
    public AggregationResults<Document> aggregateWithMetrics(Aggregation aggregation) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        
        try {
            return mongoTemplate.aggregate(aggregation, "orders", Document.class);
        } finally {
            stopWatch.stop();
            log.info("Aggregation executed in {} ms", stopWatch.getTotalTimeMillis());
        }
    }
}

11. 实际应用场景

11.1 电商分析

JAVA
public ECommerceAnalytics getAnalytics(LocalDate start, LocalDate end) {
    Aggregation aggregation = Aggregation.newAggregation(
        Aggregation.match(
            Criteria.where("orderDate").gte(start).lte(end)
        ),
        
        Aggregation.facet()
            // 总体统计
            .and(
                Aggregation.group()
                    .sum("amount").as("totalRevenue")
                    .count().as("totalOrders")
                    .addToSet("customerId").as("uniqueCustomers"),
                Aggregation.project()
                    .and("totalRevenue").as("revenue")
                    .and("totalOrders").as("orders")
                    .and(ArrayOperators.Size.lengthOfArray("uniqueCustomers")).as("customers")
            ).as("overview")
            
            // 转化率漏斗
            .and(
                Aggregation.group("status")
                    .count().as("count"),
                Aggregation.sort(Sort.by("count").descending())
            ).as("funnel")
            
            // 客户生命周期价值
            .and(
                Aggregation.group("customerId")
                    .sum("amount").as("lifetimeValue")
                    .count().as("orderCount")
                    .min("orderDate").as("firstOrderDate")
                    .max("orderDate").as("lastOrderDate"),
                Aggregation.bucketAuto("lifetimeValue", 5)  // 自动分成5个桶
            ).as("clvSegments")
    );
    
    return mongoTemplate.aggregate(
        aggregation, 
        "orders", 
        ECommerceAnalytics.class
    ).getUniqueMappedResult();
}

总结

mongoTemplate.aggregate 是 MongoDB 中最强大的数据分析工具,支持:

  1. 复杂数据处理:多阶段管道处理
  2. 丰富聚合阶段:match、group、project、lookup 等
  3. 多种表达式:算术、字符串、日期、条件表达式
  4. 高级特性:Facet、Graph Lookup、Bucket
  5. 性能优化:索引、分页、游标
  6. 灵活输出:Document、DTO、Map 等

使用建议

  • 复杂业务逻辑优先考虑聚合而不是多次查询
  • 使用索引优化 match 阶段
  • 分阶段测试聚合管道
  • 监控聚合性能
  • 考虑使用 allowDiskUse(true) 处理大数据集

全部评论: 0

    我有话说: