MongoDatabaseFactory 和 MongoClient 区别是什么
MongoDatabaseFactory 和 MongoClient 是 Spring Data MongoDB 中两个不同层次的核心组件。让我详细解释它们的区别:
1. 本质区别
| 特性 | MongoClient | MongoDatabaseFactory |
|---|---|---|
| 层级 | MongoDB 驱动层 | Spring Data 抽象层 |
| 作用 | 建立物理连接,与 MongoDB 服务器通信 | 工厂模式,创建和管理数据库连接会话 |
| 类比 | JDBC 中的 Connection |
Spring 的 DataSource |
| 创建者 | MongoDB 官方驱动 | Spring Data MongoDB |
| 线程安全 | 是(通常一个应用一个实例) | 通常是(但由具体实现决定) |
2. MongoClient:底层连接客户端
2.1 定义和职责
JAVA
// MongoDB 官方驱动提供的客户端接口
public interface MongoClient extends Closeable {
// 获取指定数据库
MongoDatabase getDatabase(String databaseName);
// 获取所有数据库名
List<String> listDatabaseNames();
// 关闭连接
void close();
// 集群相关信息
ClusterDescription getClusterDescription();
}
2.2 核心功能
JAVA
// 1. 建立物理连接
MongoClient client = MongoClients.create("mongodb://localhost:27017");
// 2. 连接管理
client.startSession(); // 创建会话
client.getClusterDescription(); // 获取集群信息
client.close(); // 关闭连接
// 3. 直接操作(不推荐在 Spring 中直接使用)
MongoDatabase db = client.getDatabase("mydb");
MongoCollection<Document> collection = db.getCollection("users");
collection.insertOne(new Document("name", "Alice"));
2.3 在你的代码中
JAVA
@Bean(name = TurnRightMongo.MONGO_CLIENT)
@Primary
public MongoClient turnRightMongoClient(
@Qualifier(TurnRightMongo.MONGO_PROPERTIES) MongoProperties turnRightMongoProperties,
Environment environment,
ObjectProvider<MongoClientSettingsBuilderCustomizer> builderCustomizers, // 未使用
ObjectProvider<MongoClientSettings> settings // 未使用
) {
// 这里只是简单创建,但应该用更完整的配置
return MongoClients.create(turnRightMongoProperties.getUri());
}
3. MongoDatabaseFactory:Spring 抽象工厂
3.1 定义和职责
JAVA
// Spring Data MongoDB 的工厂接口
public interface MongoDatabaseFactory {
// 获取 MongoDatabase 实例
MongoDatabase getMongoDatabase() throws DataAccessException;
// 获取指定名称的数据库
MongoDatabase getMongoDatabase(String dbName) throws DataAccessException;
// 获取底层的 MongoClient
MongoClient getMongoClient();
// 获取持久化实体转换器
PersistenceExceptionTranslator getExceptionTranslator();
}
3.2 核心实现
JAVA
// 主要实现类:SimpleMongoClientDatabaseFactory
public class SimpleMongoClientDatabaseFactory implements MongoDatabaseFactory {
private final MongoClient mongoClient;
private final String databaseName;
private final boolean mongoInstanceCreated;
public SimpleMongoClientDatabaseFactory(MongoClient mongoClient, String databaseName) {
this.mongoClient = mongoClient;
this.databaseName = databaseName;
this.mongoInstanceCreated = false;
}
public SimpleMongoClientDatabaseFactory(String connectionString) {
// 内部创建 MongoClient
this.mongoClient = MongoClients.create(connectionString);
this.databaseName = extractDatabaseName(connectionString);
this.mongoInstanceCreated = true;
}
@Override
public MongoDatabase getMongoDatabase() throws DataAccessException {
return mongoClient.getDatabase(this.databaseName);
}
}
3.3 在你的代码中
JAVA
@Bean(name = TurnRightMongo.MONGO_DATABASE_FACTORY)
@Primary
public MongoDatabaseFactory turnRightMongoDatabaseFactory(
@Qualifier(TurnRightMongo.MONGO_PROPERTIES) MongoProperties turnRightMongoProperties
) {
// 通过连接字符串直接创建工厂
return new SimpleMongoClientDatabaseFactory(
new ConnectionString(turnRightMongoProperties.getUri())
);
}
4. 关系图和工作流程
┌─────────────────────────────────────────────────────────────┐
│ 应用程序代码 │
│ UserRepository.save(user) │
│ MongoTemplate.findOne(query, User.class) │
└─────────────┬───────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ MongoTemplate │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ MongoDatabaseFactory factory; │ │
│ │ │ │
│ │ public <T> T findOne(Query query, Class<T> clazz){│ │
│ │ // 1. 从工厂获取数据库 │ │
│ │ MongoDatabase db = factory.getMongoDatabase(); │ │
│ │ │ │
│ │ // 2. 执行查询 │ │
│ │ FindIterable<Document> result = │ │
│ │ db.getCollection("users").find(filter); │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────┬───────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ MongoDatabaseFactory │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ private MongoClient mongoClient; │ │
│ │ private String databaseName; │ │
│ │ │ │
│ │ public MongoDatabase getMongoDatabase() { │ │
│ │ // 从 MongoClient 获取数据库 │ │
│ │ return mongoClient.getDatabase(databaseName); │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────┬───────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ MongoClient │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ private final Cluster cluster; │ │
│ │ private final MongoClientSettings settings; │ │
│ │ private final ConnectionPool connectionPool; │ │
│ │ │ │
│ │ public MongoDatabase getDatabase(String name) { │ │
│ │ // 实际的网络通信 │ │
│ │ return new MongoDatabaseImpl(name, ...); │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
5. 实际使用对比
5.1 直接使用 MongoClient(不推荐在 Spring 中)
JAVA
// 纯 MongoDB 驱动方式
MongoClient client = MongoClients.create("mongodb://localhost:27017");
MongoDatabase database = client.getDatabase("mydb");
MongoCollection<Document> collection = database.getCollection("users");
// 插入文档
Document doc = new Document("name", "John")
.append("age", 30)
.append("city", "New York");
collection.insertOne(doc);
// 查询
Document result = collection.find(eq("name", "John")).first();
// 需要手动处理异常、资源关闭等
5.2 通过 MongoDatabaseFactory(Spring 方式)
JAVA
@Service
public class UserService {
@Autowired
private MongoTemplate mongoTemplate; // 内部使用 MongoDatabaseFactory
public User findUser(String name) {
// Spring 处理异常转换、资源管理等
Query query = Query.query(Criteria.where("name").is(name));
return mongoTemplate.findOne(query, User.class);
}
}
5.3 MongoTemplate 内部实现
JAVA
public class MongoTemplate implements MongoOperations {
private final MongoDatabaseFactory mongoDatabaseFactory;
public <T> T findOne(Query query, Class<T> entityClass, String collectionName) {
try {
// 1. 从工厂获取数据库
MongoDatabase db = mongoDatabaseFactory.getMongoDatabase();
// 2. 执行操作
MongoCollection<Document> collection = db.getCollection(collectionName);
Document document = collection.find(query.getQueryObject()).first();
// 3. 使用 MappingMongoConverter 转换结果
return mappingMongoConverter.read(entityClass, document);
} catch (Exception e) {
// 4. Spring 异常转换
throw translateException(e);
}
}
}
6. 配置示例对比
6.1 完整的 MongoClient 配置
JAVA
@Bean
public MongoClient mongoClient() {
// 详细的客户端配置
ConnectionString connectionString = new ConnectionString("mongodb://localhost:27017/mydb");
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(connectionString)
.applyToClusterSettings(builder ->
builder.serverSelectionTimeout(30, TimeUnit.SECONDS))
.applyToConnectionPoolSettings(builder ->
builder.maxSize(100)
.minSize(10)
.maxWaitTime(2, TimeUnit.MINUTES))
.applyToSocketSettings(builder ->
builder.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS))
.codecRegistry(CodecRegistries.fromRegistries(
MongoClientSettings.getDefaultCodecRegistry(),
CodecRegistries.fromProviders(PojoCodecProvider.builder()
.automatic(true)
.build())
))
.build();
return MongoClients.create(settings);
}
6.2 完整的 MongoDatabaseFactory 配置
JAVA
@Bean
public MongoDatabaseFactory mongoDatabaseFactory(MongoClient mongoClient) {
// 使用配置的客户端
return new SimpleMongoClientDatabaseFactory(mongoClient, "mydb");
}
// 或者直接通过连接字符串
@Bean
public MongoDatabaseFactory mongoDatabaseFactory() {
return new SimpleMongoClientDatabaseFactory(
"mongodb://user:password@localhost:27017,localhost:27018/mydb?replicaSet=rs0"
);
}
7. 在你的配置中的问题
JAVA
// 问题:创建了两个独立的 MongoClient
@Bean
public MongoClient turnRightMongoClient(MongoProperties properties) {
return MongoClients.create(properties.getUri()); // 第一个客户端
}
@Bean
public MongoDatabaseFactory turnRightMongoDatabaseFactory(MongoProperties properties) {
// 这里又创建了一个新的 MongoClient
return new SimpleMongoClientDatabaseFactory(
new ConnectionString(properties.getUri()) // 第二个客户端
);
}
优化建议:
JAVA
@Bean
public MongoClient turnRightMongoClient(MongoProperties properties) {
// 配置完整的客户端
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(properties.getUri()))
.applyToConnectionPoolSettings(builder ->
builder.maxSize(50).minSize(5))
.build();
return MongoClients.create(settings);
}
@Bean
public MongoDatabaseFactory turnRightMongoDatabaseFactory(MongoClient mongoClient,
MongoProperties properties) {
// 复用同一个 MongoClient
String databaseName = extractDatabaseName(properties.getUri());
return new SimpleMongoClientDatabaseFactory(mongoClient, databaseName);
}
8. Spring Boot 自动配置的对比
8.1 Spring Boot 的默认配置
YAML
# application.yml
spring:
data:
mongodb:
uri: mongodb://user:password@host1:27017,host2:27017/database
# 或者分开配置
host: localhost
port: 27017
database: mydb
username: user
password: pass
Spring Boot 自动配置会:
- 创建
MongoClient(配置连接池、超时等) - 创建
MongoDatabaseFactory(使用上面的MongoClient) - 创建
MongoTemplate(使用上面的MongoDatabaseFactory)
8.2 你的配置 vs Spring Boot 自动配置
| 特性 | 你的配置 | Spring Boot 自动配置 |
|---|---|---|
| MongoClient 创建 | 直接通过 URI 创建 | 通过 MongoClientSettingsBuilderCustomizer 自定义 |
| 连接池配置 | 无 | 自动配置连接池 |
| SSL 配置 | 无 | 支持自动配置 |
| 监控指标 | 无 | 集成 Micrometer 监控 |
| 健康检查 | 无 | 自动注册健康指示器 |
9. 使用场景总结
使用 MongoClient 的场景:
- 非 Spring 环境下的纯 Java 应用
- 需要细粒度控制连接参数
- 直接使用 MongoDB 驱动功能
- 多线程环境下的连接管理
使用 MongoDatabaseFactory 的场景:
- Spring/Spring Boot 应用
- 需要与 Spring 事务集成
- 使用
MongoTemplate进行数据访问 - 需要异常转换到 Spring 的
DataAccessException - 多数据源配置
使用 MongoTemplate 的场景(间接使用两者):
- 日常的 CRUD 操作
- 需要对象-文档映射(ODM)
- 使用 Spring 的声明式事务
- 需要与其他 Spring 组件(如 Repository)集成
10. 最佳实践
JAVA
@Configuration
public class MongoConfig {
// 1. 配置一个完整的 MongoClient
@Bean
public MongoClient mongoClient(MongoProperties properties) {
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(properties.getUri()))
.applyToConnectionPoolSettings(builder ->
builder.maxSize(100)
.minSize(10)
.maxWaitTime(2, TimeUnit.MINUTES))
.applyToSocketSettings(builder ->
builder.connectTimeout(5, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS))
.build();
return MongoClients.create(settings);
}
// 2. 使用这个客户端创建工厂
@Bean
public MongoDatabaseFactory mongoDatabaseFactory(MongoClient mongoClient,
MongoProperties properties) {
String database = StringUtils.hasText(properties.getDatabase())
? properties.getDatabase()
: "default";
return new SimpleMongoClientDatabaseFactory(mongoClient, database);
}
// 3. 创建模板
@Bean
public MongoTemplate mongoTemplate(MongoDatabaseFactory factory,
MappingMongoConverter converter) {
return new MongoTemplate(factory, converter);
}
}
总结
核心区别:
MongoClient:MongoDB 官方的物理连接客户端,管理 TCP 连接、连接池、集群发现等MongoDatabaseFactory:Spring Data 的抽象工厂,封装了MongoClient,提供更高级的抽象和 Spring 集成
关系:MongoTemplate → 使用 → MongoDatabaseFactory → 使用 → MongoClient
在你的代码中:你同时配置了 MongoClient 和 MongoDatabaseFactory,但两者之间没有关联,导致创建了两个独立的连接。应该让 MongoDatabaseFactory 复用 MongoClient 的连接。
mongoTemplate.aggregate ?
mongoTemplate.aggregate 是 MongoDB 聚合管道操作的核心方法,用于执行复杂的数据分析和转换。让我详细解释:
1. 基本概念
JAVA
// 聚合管道就像数据处理流水线
Aggregation aggregation = Aggregation.newAggregation(
// 第一阶段
Aggregation.match(Criteria.where("status").is("active")),
// 第二阶段
Aggregation.group("category").sum("amount").as("totalAmount"),
// 第三阶段
Aggregation.sort(Sort.Direction.DESC, "totalAmount")
);
// 执行聚合
AggregationResults<Document> results = mongoTemplate.aggregate(
aggregation,
"orders", // 集合名
Document.class // 返回类型
);
2. 核心方法签名
JAVA
// 主要方法签名
<T> AggregationResults<T> aggregate(
Aggregation aggregation, // 聚合管道定义
String inputCollectionName, // 输入集合名
Class<T> outputType // 输出类型
);
<T> AggregationResults<T> aggregate(
Aggregation aggregation, // 聚合管道定义
Class<?> inputType, // 输入实体类
Class<T> outputType // 输出类型
);
<T> AggregationResults<T> aggregate(
TypedAggregation<?> aggregation, // 类型化聚合
Class<T> outputType // 输出类型
);
3. 完整的聚合阶段
3.1 所有聚合阶段
JAVA
// Spring Data MongoDB 支持的聚合阶段
Aggregation.match() // 过滤文档
Aggregation.group() // 分组统计
Aggregation.project() // 字段投影/计算
Aggregation.sort() // 排序
Aggregation.limit() // 限制数量
Aggregation.skip() // 跳过文档
Aggregation.unwind() // 展开数组
Aggregation.facet() // 多面聚合
Aggregation.lookup() // 左连接
Aggregation.graphLookup() // 图查找
Aggregation.addFields() // 添加字段
Aggregation.replaceRoot() // 替换根文档
Aggregation.count() // 计数
Aggregation.bucket() // 分组桶
Aggregation.bucketAuto() // 自动分组桶
Aggregation.sortByCount() // 按计数排序
Aggregation.sample() // 随机抽样
Aggregation.unionWith() // 合并集合
Aggregation.merge() // 合并到集合
4. 详细示例
4.1 基础聚合示例
JAVA
// 示例1:简单的分组统计
public List<CategoryTotal> getCategoryTotals() {
Aggregation aggregation = Aggregation.newAggregation(
// 1. 过滤条件
Aggregation.match(
Criteria.where("status").is("completed")
.and("amount").gte(100)
),
// 2. 按类别分组,计算总额
Aggregation.group("category")
.sum("amount").as("totalAmount")
.avg("amount").as("avgAmount")
.count().as("orderCount")
.first("category").as("categoryName"),
// 3. 投影:选择需要的字段
Aggregation.project("categoryName", "totalAmount", "avgAmount", "orderCount")
.andExclude("_id"),
// 4. 按总额降序排序
Aggregation.sort(Sort.Direction.DESC, "totalAmount"),
// 5. 只取前10个
Aggregation.limit(10)
);
return mongoTemplate.aggregate(
aggregation,
"orders",
CategoryTotal.class
).getMappedResults();
}
// 返回的 DTO
public class CategoryTotal {
private String categoryName;
private BigDecimal totalAmount;
private BigDecimal avgAmount;
private Integer orderCount;
// getter/setter
}
4.2 复杂聚合示例
JAVA
// 示例2:多阶段复杂聚合
public List<CustomerReport> getCustomerReport(Date startDate, Date endDate) {
Aggregation aggregation = Aggregation.newAggregation(
// 阶段1: 过滤
Aggregation.match(
Criteria.where("orderDate")
.gte(startDate)
.lte(endDate)
.and("status").in("completed", "shipped")
),
// 阶段2: 展开订单项数组
Aggregation.unwind("items"),
// 阶段3: 按客户和产品分组
Aggregation.group("customerId", "items.productId")
.sum("items.quantity").as("totalQuantity")
.sum("items.subtotal").as("totalSales")
.addToSet("items.productName").as("productNames"),
// 阶段4: 连接客户信息
Aggregation.lookup(
"customers", // 连接的目标集合
"_id.customerId", // 本地字段
"_id", // 目标集合字段
"customerInfo" // 输出字段名
),
// 阶段5: 展开客户信息
Aggregation.unwind("customerInfo", true), // preserveNullAndEmptyArrays = true
// 阶段6: 添加计算字段
Aggregation.addFields()
.addField("unitPrice")
.withValue(ArithmeticOperators.Divide
.valueOf("totalSales")
.divideBy("totalQuantity"))
.build(),
// 阶段7: 最终投影
Aggregation.project()
.and("_id.customerId").as("customerId")
.and("customerInfo.name").as("customerName")
.and("_id.productId").as("productId")
.and("productNames").arrayElementAt(0).as("productName")
.and("totalQuantity").as("quantity")
.and("totalSales").as("sales")
.and("unitPrice").as("unitPrice")
.andExclude("_id"),
// 阶段8: 排序
Aggregation.sort(Sort.by("customerName", "productName").ascending())
);
return mongoTemplate.aggregate(
aggregation,
"orders",
CustomerReport.class
).getMappedResults();
}
5. 聚合表达式
5.1 算术表达式
JAVA
// 使用算术表达式
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.project()
.and("price").as("originalPrice")
.and(ArithmeticOperators.Multiply
.valueOf("price")
.multiplyBy(0.8)) // 打8折
.as("discountedPrice")
.and(ArithmeticOperators.Subtract
.valueOf("price")
.subtract("cost"))
.as("profitMargin")
.and.as("totalPrice") // 使用表达式字符串
);
5.2 字符串表达式
JAVA
// 字符串操作
Aggregation.project()
.and(StringOperators.Concat
.valueOf("firstName")
.concat(" ")
.concatValueOf("lastName"))
.as("fullName")
.and(StringOperators.Substr
.valueOf("email")
.substring(0, 5))
.as("emailPrefix")
.and(StringOperators.ToLower
.valueOf("city"))
.as("cityLowerCase");
5.3 日期表达式
JAVA
// 日期操作
Aggregation.project()
.and(DateOperators.DateToString
.dateOf("orderDate")
.toString("%Y-%m-%d"))
.as("orderDateString")
.and(DateOperators.Year
.yearOf("orderDate"))
.as("orderYear")
.and(DateOperators.DayOfMonth
.dayOfMonth("orderDate"))
.as("orderDay");
6. 高级特性
6.1 Facet 多面聚合
JAVA
// 同一数据集的不同维度聚合
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.facet()
.and(
Aggregation.match(Criteria.where("status").is("active")),
Aggregation.count().as("activeCount")
).as("activeStats")
.and(
Aggregation.match(Criteria.where("amount").gte(1000)),
Aggregation.group("category").count().as("categoryCount"),
Aggregation.sort(Sort.Direction.DESC, "categoryCount")
).as("bigOrdersByCategory")
.and(
Aggregation.project()
.and").as("year")
.and").as("month"),
Aggregation.group("year", "month")
.sum("amount").as("monthlyTotal")
).as("monthlyTotals")
);
AggregationResults<Document> results = mongoTemplate.aggregate(
aggregation,
"orders",
Document.class
);
6.2 Graph Lookup 图查找
JAVA
// 查询组织结构
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("employeeId").is("123")),
Aggregation.graphLookup("employees")
.startWith("$managerId")
.connectFrom("managerId")
.connectTo("employeeId")
.as("managementChain")
.maxDepth(5) // 最大深度
.depthField("level")
);
// 返回指定员工的所有上级
List<Document> managementChain = results.getMappedResults();
6.3 Bucket 分组桶
JAVA
// 将数据分组到指定范围
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.bucket("amount")
.withBoundaries(0, 100, 500, 1000, 5000) // 价格区间
.withDefaultBucket("above_5000") // 默认桶
.andOutputCount().as("count")
.andOutput("amount").sum().as("totalAmount")
.andOutput("amount").avg().as("avgAmount")
);
7. 性能优化
7.1 索引优化
JAVA
// 确保聚合查询使用索引
@CompoundIndex(name = "status_date_idx", def = "{'status': 1, 'orderDate': -1}")
@Document(collection = "orders")
public class Order {
// ...
}
// 查询时使用索引字段
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(
Criteria.where("status").is("completed")
.and("orderDate").gte(startDate) // 使用复合索引字段
),
// ...
);
7.2 分页聚合
JAVA
// 聚合结果分页
public AggregationPage<OrderSummary> getOrderSummaryPage(int page, int size) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("status").is("completed")),
Aggregation.group("category")
.sum("amount").as("totalAmount")
.count().as("orderCount"),
Aggregation.sort(Sort.Direction.DESC, "totalAmount"),
Aggregation.skip((long) page * size), // 分页
Aggregation.limit(size)
);
AggregationResults<OrderSummary> results = mongoTemplate.aggregate(
aggregation,
"orders",
OrderSummary.class
);
// 获取总数
long total = mongoTemplate.count(
Query.query(Criteria.where("status").is("completed")),
"orders"
);
return new AggregationPage<>(
results.getMappedResults(),
PageRequest.of(page, size),
total
);
}
8. 聚合结果处理
8.1 多种返回类型
JAVA
// 1. 返回 Document
AggregationResults<Document> docResults = mongoTemplate.aggregate(
aggregation, "orders", Document.class
);
for (Document doc : docResults) {
System.out.println(doc.toJson());
}
// 2. 返回自定义 DTO
AggregationResults<OrderStats> dtoResults = mongoTemplate.aggregate(
aggregation, "orders", OrderStats.class
);
List<OrderStats> stats = dtoResults.getMappedResults();
// 3. 返回 Map
AggregationResults<Map> mapResults = mongoTemplate.aggregate(
aggregation, "orders", Map.class
);
// 4. 返回原始输出
AggregationOutput rawOutput = mongoTemplate.aggregate(
aggregation, "orders", BasicDBObject.class
);
8.2 处理大数据集
JAVA
// 使用游标处理大量数据
public void processLargeAggregation() {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("status").is("active")),
Aggregation.group("category").sum("amount").as("total")
);
// 使用 cursor
aggregation.withOptions(
AggregationOptions.builder()
.cursor(new Document("batchSize", 1000)) // 批次大小
.allowDiskUse(true) // 允许使用磁盘
.build()
);
try (CloseableIterator<Document> iterator = mongoTemplate
.aggregateStream(aggregation, "orders", Document.class)) {
while (iterator.hasNext()) {
Document doc = iterator.next();
processDocument(doc);
}
}
}
9. 常见聚合模式
9.1 统计报表
JAVA
public SalesReport getSalesReport(DateRange range) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(
Criteria.where("orderDate")
.gte(range.getStartDate())
.lte(range.getEndDate())
),
// 按日期统计
Aggregation.facet()
.and(
Aggregation.project()
.and(DateOperators.DateToString
.dateOf("orderDate")
.toString("%Y-%m-%d")).as("date"),
Aggregation.group("date")
.sum("amount").as("dailyTotal")
.count().as("orderCount"),
Aggregation.sort(Sort.by("date").ascending())
).as("dailySales")
// 按产品统计
.and(
Aggregation.unwind("items"),
Aggregation.group("items.productId", "items.productName")
.sum("items.quantity").as("totalQuantity")
.sum("items.subtotal").as("totalSales"),
Aggregation.sort(Sort.by("totalSales").descending()),
Aggregation.limit(10)
).as("topProducts")
// 按地区统计
.and(
Aggregation.lookup("customers", "customerId", "_id", "customer"),
Aggregation.unwind("customer"),
Aggregation.group("customer.region")
.sum("amount").as("regionTotal")
.count().as("orderCount"),
Aggregation.sort(Sort.by("regionTotal").descending())
).as("salesByRegion")
);
AggregationResults<Document> results = mongoTemplate.aggregate(
aggregation,
"orders",
Document.class
);
return mapToSalesReport(results.getUniqueMappedResult());
}
9.2 数据清洗
JAVA
public void cleanAndTransformData() {
Aggregation aggregation = Aggregation.newAggregation(
// 1. 过滤无效数据
Aggregation.match(
Criteria.where("amount").gt(0)
.and("customerId").exists(true)
),
// 2. 标准化字段
Aggregation.addFields()
.addField("normalizedEmail")
.withValue(StringOperators.ToLower.valueOf("email"))
.build()
.addField("category")
.withValue(ConditionalOperators.ifNull("category").then("uncategorized"))
.build(),
// 3. 计算衍生字段
Aggregation.addFields()
.addField("profit")
.withValue(ArithmeticOperators.Subtract
.valueOf("amount")
.subtract("cost"))
.build()
.addField("profitMargin")
.withValue(ArithmeticOperators.Multiply
.valueOf(
ArithmeticOperators.Divide
.valueOf("profit")
.divideBy("amount")
)
.multiplyBy(100))
.build(),
// 4. 写入新集合
Aggregation.out("cleaned_orders")
);
mongoTemplate.aggregate(aggregation, "raw_orders", Document.class);
}
10. 调试和优化
10.1 查看生成的聚合管道
JAVA
// 打印聚合管道
Aggregation aggregation = ...;
String pipeline = aggregation.toString();
System.out.println("Aggregation Pipeline: " + pipeline);
// 或者获取 MongoDB 原生格式
List<Document> pipelineDoc = aggregation.toPipeline(Aggregation.DEFAULT_CONTEXT);
System.out.println("MongoDB Pipeline: " + pipelineDoc);
// 使用 explain
AggregationResults<Document> results = mongoTemplate.aggregate(
aggregation.withOptions(
AggregationOptions.builder()
.explain(true) // 返回执行计划
.build()
),
"orders",
Document.class
);
10.2 监控聚合性能
JAVA
@Repository
public class OrderRepository {
@Autowired
private MongoTemplate mongoTemplate;
public AggregationResults<Document> aggregateWithMetrics(Aggregation aggregation) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
try {
return mongoTemplate.aggregate(aggregation, "orders", Document.class);
} finally {
stopWatch.stop();
log.info("Aggregation executed in {} ms", stopWatch.getTotalTimeMillis());
}
}
}
11. 实际应用场景
11.1 电商分析
JAVA
public ECommerceAnalytics getAnalytics(LocalDate start, LocalDate end) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(
Criteria.where("orderDate").gte(start).lte(end)
),
Aggregation.facet()
// 总体统计
.and(
Aggregation.group()
.sum("amount").as("totalRevenue")
.count().as("totalOrders")
.addToSet("customerId").as("uniqueCustomers"),
Aggregation.project()
.and("totalRevenue").as("revenue")
.and("totalOrders").as("orders")
.and(ArrayOperators.Size.lengthOfArray("uniqueCustomers")).as("customers")
).as("overview")
// 转化率漏斗
.and(
Aggregation.group("status")
.count().as("count"),
Aggregation.sort(Sort.by("count").descending())
).as("funnel")
// 客户生命周期价值
.and(
Aggregation.group("customerId")
.sum("amount").as("lifetimeValue")
.count().as("orderCount")
.min("orderDate").as("firstOrderDate")
.max("orderDate").as("lastOrderDate"),
Aggregation.bucketAuto("lifetimeValue", 5) // 自动分成5个桶
).as("clvSegments")
);
return mongoTemplate.aggregate(
aggregation,
"orders",
ECommerceAnalytics.class
).getUniqueMappedResult();
}
总结
mongoTemplate.aggregate 是 MongoDB 中最强大的数据分析工具,支持:
- 复杂数据处理:多阶段管道处理
- 丰富聚合阶段:match、group、project、lookup 等
- 多种表达式:算术、字符串、日期、条件表达式
- 高级特性:Facet、Graph Lookup、Bucket
- 性能优化:索引、分页、游标
- 灵活输出:Document、DTO、Map 等
使用建议:
- 复杂业务逻辑优先考虑聚合而不是多次查询
- 使用索引优化 match 阶段
- 分阶段测试聚合管道
- 监控聚合性能
- 考虑使用
allowDiskUse(true)处理大数据集