mongoTemplate.aggregate ?
mongoTemplate.aggregate 是 MongoDB 聚合管道操作的核心方法,用于执行复杂的数据分析和转换。让我详细解释:
1. 基本概念
JAVA
// 聚合管道就像数据处理流水线
Aggregation aggregation = Aggregation.newAggregation(
// 第一阶段
Aggregation.match(Criteria.where("status").is("active")),
// 第二阶段
Aggregation.group("category").sum("amount").as("totalAmount"),
// 第三阶段
Aggregation.sort(Sort.Direction.DESC, "totalAmount")
);
// 执行聚合
AggregationResults<Document> results = mongoTemplate.aggregate(
aggregation,
"orders", // 集合名
Document.class // 返回类型
);
2. 核心方法签名
JAVA
// 主要方法签名
<T> AggregationResults<T> aggregate(
Aggregation aggregation, // 聚合管道定义
String inputCollectionName, // 输入集合名
Class<T> outputType // 输出类型
);
<T> AggregationResults<T> aggregate(
Aggregation aggregation, // 聚合管道定义
Class<?> inputType, // 输入实体类
Class<T> outputType // 输出类型
);
<T> AggregationResults<T> aggregate(
TypedAggregation<?> aggregation, // 类型化聚合
Class<T> outputType // 输出类型
);
3. 完整的聚合阶段
3.1 所有聚合阶段
JAVA
// Spring Data MongoDB 支持的聚合阶段
Aggregation.match() // 过滤文档
Aggregation.group() // 分组统计
Aggregation.project() // 字段投影/计算
Aggregation.sort() // 排序
Aggregation.limit() // 限制数量
Aggregation.skip() // 跳过文档
Aggregation.unwind() // 展开数组
Aggregation.facet() // 多面聚合
Aggregation.lookup() // 左连接
Aggregation.graphLookup() // 图查找
Aggregation.addFields() // 添加字段
Aggregation.replaceRoot() // 替换根文档
Aggregation.count() // 计数
Aggregation.bucket() // 分组桶
Aggregation.bucketAuto() // 自动分组桶
Aggregation.sortByCount() // 按计数排序
Aggregation.sample() // 随机抽样
Aggregation.unionWith() // 合并集合
Aggregation.merge() // 合并到集合
4. 详细示例
4.1 基础聚合示例
JAVA
// 示例1:简单的分组统计
public List<CategoryTotal> getCategoryTotals() {
Aggregation aggregation = Aggregation.newAggregation(
// 1. 过滤条件
Aggregation.match(
Criteria.where("status").is("completed")
.and("amount").gte(100)
),
// 2. 按类别分组,计算总额
Aggregation.group("category")
.sum("amount").as("totalAmount")
.avg("amount").as("avgAmount")
.count().as("orderCount")
.first("category").as("categoryName"),
// 3. 投影:选择需要的字段
Aggregation.project("categoryName", "totalAmount", "avgAmount", "orderCount")
.andExclude("_id"),
// 4. 按总额降序排序
Aggregation.sort(Sort.Direction.DESC, "totalAmount"),
// 5. 只取前10个
Aggregation.limit(10)
);
return mongoTemplate.aggregate(
aggregation,
"orders",
CategoryTotal.class
).getMappedResults();
}
// 返回的 DTO
public class CategoryTotal {
private String categoryName;
private BigDecimal totalAmount;
private BigDecimal avgAmount;
private Integer orderCount;
// getter/setter
}
4.2 复杂聚合示例
JAVA
// 示例2:多阶段复杂聚合
public List<CustomerReport> getCustomerReport(Date startDate, Date endDate) {
Aggregation aggregation = Aggregation.newAggregation(
// 阶段1: 过滤
Aggregation.match(
Criteria.where("orderDate")
.gte(startDate)
.lte(endDate)
.and("status").in("completed", "shipped")
),
// 阶段2: 展开订单项数组
Aggregation.unwind("items"),
// 阶段3: 按客户和产品分组
Aggregation.group("customerId", "items.productId")
.sum("items.quantity").as("totalQuantity")
.sum("items.subtotal").as("totalSales")
.addToSet("items.productName").as("productNames"),
// 阶段4: 连接客户信息
Aggregation.lookup(
"customers", // 连接的目标集合
"_id.customerId", // 本地字段
"_id", // 目标集合字段
"customerInfo" // 输出字段名
),
// 阶段5: 展开客户信息
Aggregation.unwind("customerInfo", true), // preserveNullAndEmptyArrays = true
// 阶段6: 添加计算字段
Aggregation.addFields()
.addField("unitPrice")
.withValue(ArithmeticOperators.Divide
.valueOf("totalSales")
.divideBy("totalQuantity"))
.build(),
// 阶段7: 最终投影
Aggregation.project()
.and("_id.customerId").as("customerId")
.and("customerInfo.name").as("customerName")
.and("_id.productId").as("productId")
.and("productNames").arrayElementAt(0).as("productName")
.and("totalQuantity").as("quantity")
.and("totalSales").as("sales")
.and("unitPrice").as("unitPrice")
.andExclude("_id"),
// 阶段8: 排序
Aggregation.sort(Sort.by("customerName", "productName").ascending())
);
return mongoTemplate.aggregate(
aggregation,
"orders",
CustomerReport.class
).getMappedResults();
}
5. 聚合表达式
5.1 算术表达式
JAVA
// 使用算术表达式
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.project()
.and("price").as("originalPrice")
.and(ArithmeticOperators.Multiply
.valueOf("price")
.multiplyBy(0.8)) // 打8折
.as("discountedPrice")
.and(ArithmeticOperators.Subtract
.valueOf("price")
.subtract("cost"))
.as("profitMargin")
.and.as("totalPrice") // 使用表达式字符串
);
5.2 字符串表达式
JAVA
// 字符串操作
Aggregation.project()
.and(StringOperators.Concat
.valueOf("firstName")
.concat(" ")
.concatValueOf("lastName"))
.as("fullName")
.and(StringOperators.Substr
.valueOf("email")
.substring(0, 5))
.as("emailPrefix")
.and(StringOperators.ToLower
.valueOf("city"))
.as("cityLowerCase");
5.3 日期表达式
JAVA
// 日期操作
Aggregation.project()
.and(DateOperators.DateToString
.dateOf("orderDate")
.toString("%Y-%m-%d"))
.as("orderDateString")
.and(DateOperators.Year
.yearOf("orderDate"))
.as("orderYear")
.and(DateOperators.DayOfMonth
.dayOfMonth("orderDate"))
.as("orderDay");
6. 高级特性
6.1 Facet 多面聚合
JAVA
// 同一数据集的不同维度聚合
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.facet()
.and(
Aggregation.match(Criteria.where("status").is("active")),
Aggregation.count().as("activeCount")
).as("activeStats")
.and(
Aggregation.match(Criteria.where("amount").gte(1000)),
Aggregation.group("category").count().as("categoryCount"),
Aggregation.sort(Sort.Direction.DESC, "categoryCount")
).as("bigOrdersByCategory")
.and(
Aggregation.project()
.and").as("year")
.and").as("month"),
Aggregation.group("year", "month")
.sum("amount").as("monthlyTotal")
).as("monthlyTotals")
);
AggregationResults<Document> results = mongoTemplate.aggregate(
aggregation,
"orders",
Document.class
);
6.2 Graph Lookup 图查找
JAVA
// 查询组织结构
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("employeeId").is("123")),
Aggregation.graphLookup("employees")
.startWith("$managerId")
.connectFrom("managerId")
.connectTo("employeeId")
.as("managementChain")
.maxDepth(5) // 最大深度
.depthField("level")
);
// 返回指定员工的所有上级
List<Document> managementChain = results.getMappedResults();
6.3 Bucket 分组桶
JAVA
// 将数据分组到指定范围
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.bucket("amount")
.withBoundaries(0, 100, 500, 1000, 5000) // 价格区间
.withDefaultBucket("above_5000") // 默认桶
.andOutputCount().as("count")
.andOutput("amount").sum().as("totalAmount")
.andOutput("amount").avg().as("avgAmount")
);
7. 性能优化
7.1 索引优化
JAVA
// 确保聚合查询使用索引
@CompoundIndex(name = "status_date_idx", def = "{'status': 1, 'orderDate': -1}")
@Document(collection = "orders")
public class Order {
// ...
}
// 查询时使用索引字段
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(
Criteria.where("status").is("completed")
.and("orderDate").gte(startDate) // 使用复合索引字段
),
// ...
);
7.2 分页聚合
JAVA
// 聚合结果分页
public AggregationPage<OrderSummary> getOrderSummaryPage(int page, int size) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("status").is("completed")),
Aggregation.group("category")
.sum("amount").as("totalAmount")
.count().as("orderCount"),
Aggregation.sort(Sort.Direction.DESC, "totalAmount"),
Aggregation.skip((long) page * size), // 分页
Aggregation.limit(size)
);
AggregationResults<OrderSummary> results = mongoTemplate.aggregate(
aggregation,
"orders",
OrderSummary.class
);
// 获取总数
long total = mongoTemplate.count(
Query.query(Criteria.where("status").is("completed")),
"orders"
);
return new AggregationPage<>(
results.getMappedResults(),
PageRequest.of(page, size),
total
);
}
8. 聚合结果处理
8.1 多种返回类型
JAVA
// 1. 返回 Document
AggregationResults<Document> docResults = mongoTemplate.aggregate(
aggregation, "orders", Document.class
);
for (Document doc : docResults) {
System.out.println(doc.toJson());
}
// 2. 返回自定义 DTO
AggregationResults<OrderStats> dtoResults = mongoTemplate.aggregate(
aggregation, "orders", OrderStats.class
);
List<OrderStats> stats = dtoResults.getMappedResults();
// 3. 返回 Map
AggregationResults<Map> mapResults = mongoTemplate.aggregate(
aggregation, "orders", Map.class
);
// 4. 返回原始输出
AggregationOutput rawOutput = mongoTemplate.aggregate(
aggregation, "orders", BasicDBObject.class
);
8.2 处理大数据集
JAVA
// 使用游标处理大量数据
public void processLargeAggregation() {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("status").is("active")),
Aggregation.group("category").sum("amount").as("total")
);
// 使用 cursor
aggregation.withOptions(
AggregationOptions.builder()
.cursor(new Document("batchSize", 1000)) // 批次大小
.allowDiskUse(true) // 允许使用磁盘
.build()
);
try (CloseableIterator<Document> iterator = mongoTemplate
.aggregateStream(aggregation, "orders", Document.class)) {
while (iterator.hasNext()) {
Document doc = iterator.next();
processDocument(doc);
}
}
}
9. 常见聚合模式
9.1 统计报表
JAVA
public SalesReport getSalesReport(DateRange range) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(
Criteria.where("orderDate")
.gte(range.getStartDate())
.lte(range.getEndDate())
),
// 按日期统计
Aggregation.facet()
.and(
Aggregation.project()
.and(DateOperators.DateToString
.dateOf("orderDate")
.toString("%Y-%m-%d")).as("date"),
Aggregation.group("date")
.sum("amount").as("dailyTotal")
.count().as("orderCount"),
Aggregation.sort(Sort.by("date").ascending())
).as("dailySales")
// 按产品统计
.and(
Aggregation.unwind("items"),
Aggregation.group("items.productId", "items.productName")
.sum("items.quantity").as("totalQuantity")
.sum("items.subtotal").as("totalSales"),
Aggregation.sort(Sort.by("totalSales").descending()),
Aggregation.limit(10)
).as("topProducts")
// 按地区统计
.and(
Aggregation.lookup("customers", "customerId", "_id", "customer"),
Aggregation.unwind("customer"),
Aggregation.group("customer.region")
.sum("amount").as("regionTotal")
.count().as("orderCount"),
Aggregation.sort(Sort.by("regionTotal").descending())
).as("salesByRegion")
);
AggregationResults<Document> results = mongoTemplate.aggregate(
aggregation,
"orders",
Document.class
);
return mapToSalesReport(results.getUniqueMappedResult());
}
9.2 数据清洗
JAVA
public void cleanAndTransformData() {
Aggregation aggregation = Aggregation.newAggregation(
// 1. 过滤无效数据
Aggregation.match(
Criteria.where("amount").gt(0)
.and("customerId").exists(true)
),
// 2. 标准化字段
Aggregation.addFields()
.addField("normalizedEmail")
.withValue(StringOperators.ToLower.valueOf("email"))
.build()
.addField("category")
.withValue(ConditionalOperators.ifNull("category").then("uncategorized"))
.build(),
// 3. 计算衍生字段
Aggregation.addFields()
.addField("profit")
.withValue(ArithmeticOperators.Subtract
.valueOf("amount")
.subtract("cost"))
.build()
.addField("profitMargin")
.withValue(ArithmeticOperators.Multiply
.valueOf(
ArithmeticOperators.Divide
.valueOf("profit")
.divideBy("amount")
)
.multiplyBy(100))
.build(),
// 4. 写入新集合
Aggregation.out("cleaned_orders")
);
mongoTemplate.aggregate(aggregation, "raw_orders", Document.class);
}
10. 调试和优化
10.1 查看生成的聚合管道
JAVA
// 打印聚合管道
Aggregation aggregation = ...;
String pipeline = aggregation.toString();
System.out.println("Aggregation Pipeline: " + pipeline);
// 或者获取 MongoDB 原生格式
List<Document> pipelineDoc = aggregation.toPipeline(Aggregation.DEFAULT_CONTEXT);
System.out.println("MongoDB Pipeline: " + pipelineDoc);
// 使用 explain
AggregationResults<Document> results = mongoTemplate.aggregate(
aggregation.withOptions(
AggregationOptions.builder()
.explain(true) // 返回执行计划
.build()
),
"orders",
Document.class
);
10.2 监控聚合性能
JAVA
@Repository
public class OrderRepository {
@Autowired
private MongoTemplate mongoTemplate;
public AggregationResults<Document> aggregateWithMetrics(Aggregation aggregation) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
try {
return mongoTemplate.aggregate(aggregation, "orders", Document.class);
} finally {
stopWatch.stop();
log.info("Aggregation executed in {} ms", stopWatch.getTotalTimeMillis());
}
}
}
11. 实际应用场景
11.1 电商分析
JAVA
public ECommerceAnalytics getAnalytics(LocalDate start, LocalDate end) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(
Criteria.where("orderDate").gte(start).lte(end)
),
Aggregation.facet()
// 总体统计
.and(
Aggregation.group()
.sum("amount").as("totalRevenue")
.count().as("totalOrders")
.addToSet("customerId").as("uniqueCustomers"),
Aggregation.project()
.and("totalRevenue").as("revenue")
.and("totalOrders").as("orders")
.and(ArrayOperators.Size.lengthOfArray("uniqueCustomers")).as("customers")
).as("overview")
// 转化率漏斗
.and(
Aggregation.group("status")
.count().as("count"),
Aggregation.sort(Sort.by("count").descending())
).as("funnel")
// 客户生命周期价值
.and(
Aggregation.group("customerId")
.sum("amount").as("lifetimeValue")
.count().as("orderCount")
.min("orderDate").as("firstOrderDate")
.max("orderDate").as("lastOrderDate"),
Aggregation.bucketAuto("lifetimeValue", 5) // 自动分成5个桶
).as("clvSegments")
);
return mongoTemplate.aggregate(
aggregation,
"orders",
ECommerceAnalytics.class
).getUniqueMappedResult();
}
总结
mongoTemplate.aggregate 是 MongoDB 中最强大的数据分析工具,支持:
- 复杂数据处理:多阶段管道处理
- 丰富聚合阶段:match、group、project、lookup 等
- 多种表达式:算术、字符串、日期、条件表达式
- 高级特性:Facet、Graph Lookup、Bucket
- 性能优化:索引、分页、游标
- 灵活输出:Document、DTO、Map 等
使用建议:
- 复杂业务逻辑优先考虑聚合而不是多次查询
- 使用索引优化 match 阶段
- 分阶段测试聚合管道
- 监控聚合性能
- 考虑使用
allowDiskUse(true)处理大数据集