MongoDatabaseFactory与MongoClient区别解析

小豆丁 19小时前 ⋅ 7 阅读
聊天记录
头像

MongoDatabaseFactory 和 MongoClient 区别是什么

头像

MongoDatabaseFactoryMongoClient 是 Spring Data MongoDB 中两个不同层次的核心组件。让我详细解释它们的区别:

1. 本质区别

特性 MongoClient MongoDatabaseFactory
层级 MongoDB 驱动层 Spring Data 抽象层
作用 建立物理连接,与 MongoDB 服务器通信 工厂模式,创建和管理数据库连接会话
类比 JDBC 中的 Connection Spring 的 DataSource
创建者 MongoDB 官方驱动 Spring Data MongoDB
线程安全 是(通常一个应用一个实例) 通常是(但由具体实现决定)

2. MongoClient:底层连接客户端

2.1 定义和职责

JAVA
// MongoDB 官方驱动提供的客户端接口
public interface MongoClient extends Closeable {
    // 获取指定数据库
    MongoDatabase getDatabase(String databaseName);
    
    // 获取所有数据库名
    List<String> listDatabaseNames();
    
    // 关闭连接
    void close();
    
    // 集群相关信息
    ClusterDescription getClusterDescription();
}

2.2 核心功能

JAVA
// 1. 建立物理连接
MongoClient client = MongoClients.create("mongodb://localhost:27017");

// 2. 连接管理
client.startSession();  // 创建会话
client.getClusterDescription();  // 获取集群信息
client.close();  // 关闭连接

// 3. 直接操作(不推荐在 Spring 中直接使用)
MongoDatabase db = client.getDatabase("mydb");
MongoCollection<Document> collection = db.getCollection("users");
collection.insertOne(new Document("name", "Alice"));

2.3 在你的代码中

JAVA
@Bean(name = TurnRightMongo.MONGO_CLIENT)
@Primary
public MongoClient turnRightMongoClient(
    @Qualifier(TurnRightMongo.MONGO_PROPERTIES) MongoProperties turnRightMongoProperties,
    Environment environment,
    ObjectProvider<MongoClientSettingsBuilderCustomizer> builderCustomizers,  // 未使用
    ObjectProvider<MongoClientSettings> settings  // 未使用
) {
    // 这里只是简单创建,但应该用更完整的配置
    return MongoClients.create(turnRightMongoProperties.getUri());
}

3. MongoDatabaseFactory:Spring 抽象工厂

3.1 定义和职责

JAVA
// Spring Data MongoDB 的工厂接口
public interface MongoDatabaseFactory {
    // 获取 MongoDatabase 实例
    MongoDatabase getMongoDatabase() throws DataAccessException;
    
    // 获取指定名称的数据库
    MongoDatabase getMongoDatabase(String dbName) throws DataAccessException;
    
    // 获取底层的 MongoClient
    MongoClient getMongoClient();
    
    // 获取持久化实体转换器
    PersistenceExceptionTranslator getExceptionTranslator();
}

3.2 核心实现

JAVA
// 主要实现类:SimpleMongoClientDatabaseFactory
public class SimpleMongoClientDatabaseFactory implements MongoDatabaseFactory {
    private final MongoClient mongoClient;
    private final String databaseName;
    private final boolean mongoInstanceCreated;
    
    public SimpleMongoClientDatabaseFactory(MongoClient mongoClient, String databaseName) {
        this.mongoClient = mongoClient;
        this.databaseName = databaseName;
        this.mongoInstanceCreated = false;
    }
    
    public SimpleMongoClientDatabaseFactory(String connectionString) {
        // 内部创建 MongoClient
        this.mongoClient = MongoClients.create(connectionString);
        this.databaseName = extractDatabaseName(connectionString);
        this.mongoInstanceCreated = true;
    }
    
    @Override
    public MongoDatabase getMongoDatabase() throws DataAccessException {
        return mongoClient.getDatabase(this.databaseName);
    }
}

3.3 在你的代码中

JAVA
@Bean(name = TurnRightMongo.MONGO_DATABASE_FACTORY)
@Primary
public MongoDatabaseFactory turnRightMongoDatabaseFactory(
    @Qualifier(TurnRightMongo.MONGO_PROPERTIES) MongoProperties turnRightMongoProperties
) {
    // 通过连接字符串直接创建工厂
    return new SimpleMongoClientDatabaseFactory(
        new ConnectionString(turnRightMongoProperties.getUri())
    );
}

4. 关系图和工作流程

┌─────────────────────────────────────────────────────────────┐
│                    应用程序代码                             │
│  UserRepository.save(user)                                 │
│  MongoTemplate.findOne(query, User.class)                  │
└─────────────┬───────────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────┐
│                   MongoTemplate                            │
│  ┌─────────────────────────────────────────────────────┐  │
│  │  MongoDatabaseFactory factory;                     │  │
│  │                                                     │  │
│  │  public <T> T findOne(Query query, Class<T> clazz){│  │
│  │    // 1. 从工厂获取数据库                          │  │
│  │    MongoDatabase db = factory.getMongoDatabase();  │  │
│  │                                                     │  │
│  │    // 2. 执行查询                                  │  │
│  │    FindIterable<Document> result =                  │  │
│  │        db.getCollection("users").find(filter);      │  │
│  │  }                                                  │  │
│  └─────────────────────────────────────────────────────┘  │
└─────────────┬───────────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────┐
│              MongoDatabaseFactory                          │
│  ┌─────────────────────────────────────────────────────┐  │
│  │  private MongoClient mongoClient;                  │  │
│  │  private String databaseName;                       │  │
│  │                                                     │  │
│  │  public MongoDatabase getMongoDatabase() {         │  │
│  │    // 从 MongoClient 获取数据库                     │  │
│  │    return mongoClient.getDatabase(databaseName);   │  │
│  │  }                                                  │  │
│  └─────────────────────────────────────────────────────┘  │
└─────────────┬───────────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────┐
│                   MongoClient                              │
│  ┌─────────────────────────────────────────────────────┐  │
│  │  private final Cluster cluster;                     │  │
│  │  private final MongoClientSettings settings;        │  │
│  │  private final ConnectionPool connectionPool;       │  │
│  │                                                     │  │
│  │  public MongoDatabase getDatabase(String name) {    │  │
│  │    // 实际的网络通信                                 │  │
│  │    return new MongoDatabaseImpl(name, ...);         │  │
│  │  }                                                  │  │
│  └─────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

5. 实际使用对比

5.1 直接使用 MongoClient(不推荐在 Spring 中)

JAVA
// 纯 MongoDB 驱动方式
MongoClient client = MongoClients.create("mongodb://localhost:27017");
MongoDatabase database = client.getDatabase("mydb");
MongoCollection<Document> collection = database.getCollection("users");

// 插入文档
Document doc = new Document("name", "John")
    .append("age", 30)
    .append("city", "New York");
collection.insertOne(doc);

// 查询
Document result = collection.find(eq("name", "John")).first();

// 需要手动处理异常、资源关闭等

5.2 通过 MongoDatabaseFactory(Spring 方式)

JAVA
@Service
public class UserService {
    
    @Autowired
    private MongoTemplate mongoTemplate;  // 内部使用 MongoDatabaseFactory
    
    public User findUser(String name) {
        // Spring 处理异常转换、资源管理等
        Query query = Query.query(Criteria.where("name").is(name));
        return mongoTemplate.findOne(query, User.class);
    }
}

5.3 MongoTemplate 内部实现

JAVA
public class MongoTemplate implements MongoOperations {
    
    private final MongoDatabaseFactory mongoDatabaseFactory;
    
    public <T> T findOne(Query query, Class<T> entityClass, String collectionName) {
        try {
            // 1. 从工厂获取数据库
            MongoDatabase db = mongoDatabaseFactory.getMongoDatabase();
            
            // 2. 执行操作
            MongoCollection<Document> collection = db.getCollection(collectionName);
            Document document = collection.find(query.getQueryObject()).first();
            
            // 3. 使用 MappingMongoConverter 转换结果
            return mappingMongoConverter.read(entityClass, document);
            
        } catch (Exception e) {
            // 4. Spring 异常转换
            throw translateException(e);
        }
    }
}

6. 配置示例对比

6.1 完整的 MongoClient 配置

JAVA
@Bean
public MongoClient mongoClient() {
    // 详细的客户端配置
    ConnectionString connectionString = new ConnectionString("mongodb://localhost:27017/mydb");
    
    MongoClientSettings settings = MongoClientSettings.builder()
        .applyConnectionString(connectionString)
        .applyToClusterSettings(builder -> 
            builder.serverSelectionTimeout(30, TimeUnit.SECONDS))
        .applyToConnectionPoolSettings(builder ->
            builder.maxSize(100)
                   .minSize(10)
                   .maxWaitTime(2, TimeUnit.MINUTES))
        .applyToSocketSettings(builder ->
            builder.connectTimeout(10, TimeUnit.SECONDS)
                   .readTimeout(10, TimeUnit.SECONDS))
        .codecRegistry(CodecRegistries.fromRegistries(
            MongoClientSettings.getDefaultCodecRegistry(),
            CodecRegistries.fromProviders(PojoCodecProvider.builder()
                .automatic(true)
                .build())
        ))
        .build();
    
    return MongoClients.create(settings);
}

6.2 完整的 MongoDatabaseFactory 配置

JAVA
@Bean
public MongoDatabaseFactory mongoDatabaseFactory(MongoClient mongoClient) {
    // 使用配置的客户端
    return new SimpleMongoClientDatabaseFactory(mongoClient, "mydb");
}

// 或者直接通过连接字符串
@Bean  
public MongoDatabaseFactory mongoDatabaseFactory() {
    return new SimpleMongoClientDatabaseFactory(
        "mongodb://user:password@localhost:27017,localhost:27018/mydb?replicaSet=rs0"
    );
}

7. 在你的配置中的问题

JAVA
// 问题:创建了两个独立的 MongoClient
@Bean
public MongoClient turnRightMongoClient(MongoProperties properties) {
    return MongoClients.create(properties.getUri());  // 第一个客户端
}

@Bean  
public MongoDatabaseFactory turnRightMongoDatabaseFactory(MongoProperties properties) {
    // 这里又创建了一个新的 MongoClient
    return new SimpleMongoClientDatabaseFactory(
        new ConnectionString(properties.getUri())  // 第二个客户端
    );
}

优化建议

JAVA
@Bean
public MongoClient turnRightMongoClient(MongoProperties properties) {
    // 配置完整的客户端
    MongoClientSettings settings = MongoClientSettings.builder()
        .applyConnectionString(new ConnectionString(properties.getUri()))
        .applyToConnectionPoolSettings(builder -> 
            builder.maxSize(50).minSize(5))
        .build();
    return MongoClients.create(settings);
}

@Bean
public MongoDatabaseFactory turnRightMongoDatabaseFactory(MongoClient mongoClient, 
                                                        MongoProperties properties) {
    // 复用同一个 MongoClient
    String databaseName = extractDatabaseName(properties.getUri());
    return new SimpleMongoClientDatabaseFactory(mongoClient, databaseName);
}

8. Spring Boot 自动配置的对比

8.1 Spring Boot 的默认配置

YAML
# application.yml
spring:
  data:
    mongodb:
      uri: mongodb://user:password@host1:27017,host2:27017/database
      # 或者分开配置
      host: localhost
      port: 27017
      database: mydb
      username: user
      password: pass

Spring Boot 自动配置会:

  1. 创建 MongoClient(配置连接池、超时等)
  2. 创建 MongoDatabaseFactory(使用上面的 MongoClient
  3. 创建 MongoTemplate(使用上面的 MongoDatabaseFactory

8.2 你的配置 vs Spring Boot 自动配置

特性 你的配置 Spring Boot 自动配置
MongoClient 创建 直接通过 URI 创建 通过 MongoClientSettingsBuilderCustomizer 自定义
连接池配置 自动配置连接池
SSL 配置 支持自动配置
监控指标 集成 Micrometer 监控
健康检查 自动注册健康指示器

9. 使用场景总结

使用 MongoClient 的场景:

  1. 非 Spring 环境下的纯 Java 应用
  2. 需要细粒度控制连接参数
  3. 直接使用 MongoDB 驱动功能
  4. 多线程环境下的连接管理

使用 MongoDatabaseFactory 的场景:

  1. Spring/Spring Boot 应用
  2. 需要与 Spring 事务集成
  3. 使用 MongoTemplate 进行数据访问
  4. 需要异常转换到 Spring 的 DataAccessException
  5. 多数据源配置

使用 MongoTemplate 的场景(间接使用两者):

  1. 日常的 CRUD 操作
  2. 需要对象-文档映射(ODM)
  3. 使用 Spring 的声明式事务
  4. 需要与其他 Spring 组件(如 Repository)集成

10. 最佳实践

JAVA
@Configuration
public class MongoConfig {
    
    // 1. 配置一个完整的 MongoClient
    @Bean
    public MongoClient mongoClient(MongoProperties properties) {
        MongoClientSettings settings = MongoClientSettings.builder()
            .applyConnectionString(new ConnectionString(properties.getUri()))
            .applyToConnectionPoolSettings(builder -> 
                builder.maxSize(100)
                       .minSize(10)
                       .maxWaitTime(2, TimeUnit.MINUTES))
            .applyToSocketSettings(builder ->
                builder.connectTimeout(5, TimeUnit.SECONDS)
                       .readTimeout(10, TimeUnit.SECONDS))
            .build();
        
        return MongoClients.create(settings);
    }
    
    // 2. 使用这个客户端创建工厂
    @Bean
    public MongoDatabaseFactory mongoDatabaseFactory(MongoClient mongoClient, 
                                                   MongoProperties properties) {
        String database = StringUtils.hasText(properties.getDatabase()) 
            ? properties.getDatabase() 
            : "default";
        return new SimpleMongoClientDatabaseFactory(mongoClient, database);
    }
    
    // 3. 创建模板
    @Bean
    public MongoTemplate mongoTemplate(MongoDatabaseFactory factory,
                                      MappingMongoConverter converter) {
        return new MongoTemplate(factory, converter);
    }
}

总结

核心区别

  • MongoClient:MongoDB 官方的物理连接客户端,管理 TCP 连接、连接池、集群发现等
  • MongoDatabaseFactory:Spring Data 的抽象工厂,封装了 MongoClient,提供更高级的抽象和 Spring 集成

关系MongoTemplate → 使用 → MongoDatabaseFactory → 使用 → MongoClient

在你的代码中:你同时配置了 MongoClientMongoDatabaseFactory,但两者之间没有关联,导致创建了两个独立的连接。应该让 MongoDatabaseFactory 复用 MongoClient 的连接。

头像

mongoTemplate.aggregate ?

头像

mongoTemplate.aggregateMongoDB 聚合管道操作的核心方法,用于执行复杂的数据分析和转换。让我详细解释:

1. 基本概念

JAVA
// 聚合管道就像数据处理流水线
Aggregation aggregation = Aggregation.newAggregation(
    // 第一阶段
    Aggregation.match(Criteria.where("status").is("active")),
    
    // 第二阶段  
    Aggregation.group("category").sum("amount").as("totalAmount"),
    
    // 第三阶段
    Aggregation.sort(Sort.Direction.DESC, "totalAmount")
);

// 执行聚合
AggregationResults<Document> results = mongoTemplate.aggregate(
    aggregation, 
    "orders",  // 集合名
    Document.class  // 返回类型
);

2. 核心方法签名

JAVA
// 主要方法签名
<T> AggregationResults<T> aggregate(
    Aggregation aggregation,          // 聚合管道定义
    String inputCollectionName,      // 输入集合名
    Class<T> outputType              // 输出类型
);

<T> AggregationResults<T> aggregate(
    Aggregation aggregation,          // 聚合管道定义
    Class<?> inputType,              // 输入实体类
    Class<T> outputType              // 输出类型
);

<T> AggregationResults<T> aggregate(
    TypedAggregation<?> aggregation,  // 类型化聚合
    Class<T> outputType              // 输出类型
);

3. 完整的聚合阶段

3.1 所有聚合阶段

JAVA
// Spring Data MongoDB 支持的聚合阶段
Aggregation.match()        // 过滤文档
Aggregation.group()        // 分组统计
Aggregation.project()      // 字段投影/计算
Aggregation.sort()         // 排序
Aggregation.limit()        // 限制数量
Aggregation.skip()         // 跳过文档
Aggregation.unwind()       // 展开数组
Aggregation.facet()        // 多面聚合
Aggregation.lookup()       // 左连接
Aggregation.graphLookup()  // 图查找
Aggregation.addFields()    // 添加字段
Aggregation.replaceRoot()  // 替换根文档
Aggregation.count()        // 计数
Aggregation.bucket()       // 分组桶
Aggregation.bucketAuto()   // 自动分组桶
Aggregation.sortByCount()  // 按计数排序
Aggregation.sample()       // 随机抽样
Aggregation.unionWith()    // 合并集合
Aggregation.merge()        // 合并到集合

4. 详细示例

4.1 基础聚合示例

JAVA
// 示例1:简单的分组统计
public List<CategoryTotal> getCategoryTotals() {
    Aggregation aggregation = Aggregation.newAggregation(
        // 1. 过滤条件
        Aggregation.match(
            Criteria.where("status").is("completed")
                   .and("amount").gte(100)
        ),
        
        // 2. 按类别分组,计算总额
        Aggregation.group("category")
            .sum("amount").as("totalAmount")
            .avg("amount").as("avgAmount")
            .count().as("orderCount")
            .first("category").as("categoryName"),
        
        // 3. 投影:选择需要的字段
        Aggregation.project("categoryName", "totalAmount", "avgAmount", "orderCount")
            .andExclude("_id"),
        
        // 4. 按总额降序排序
        Aggregation.sort(Sort.Direction.DESC, "totalAmount"),
        
        // 5. 只取前10个
        Aggregation.limit(10)
    );
    
    return mongoTemplate.aggregate(
        aggregation, 
        "orders", 
        CategoryTotal.class
    ).getMappedResults();
}

// 返回的 DTO
public class CategoryTotal {
    private String categoryName;
    private BigDecimal totalAmount;
    private BigDecimal avgAmount;
    private Integer orderCount;
    // getter/setter
}

4.2 复杂聚合示例

JAVA
// 示例2:多阶段复杂聚合
public List<CustomerReport> getCustomerReport(Date startDate, Date endDate) {
    Aggregation aggregation = Aggregation.newAggregation(
        // 阶段1: 过滤
        Aggregation.match(
            Criteria.where("orderDate")
                .gte(startDate)
                .lte(endDate)
                .and("status").in("completed", "shipped")
        ),
        
        // 阶段2: 展开订单项数组
        Aggregation.unwind("items"),
        
        // 阶段3: 按客户和产品分组
        Aggregation.group("customerId", "items.productId")
            .sum("items.quantity").as("totalQuantity")
            .sum("items.subtotal").as("totalSales")
            .addToSet("items.productName").as("productNames"),
        
        // 阶段4: 连接客户信息
        Aggregation.lookup(
            "customers",        // 连接的目标集合
            "_id.customerId",   // 本地字段
            "_id",              // 目标集合字段
            "customerInfo"      // 输出字段名
        ),
        
        // 阶段5: 展开客户信息
        Aggregation.unwind("customerInfo", true),  // preserveNullAndEmptyArrays = true
        
        // 阶段6: 添加计算字段
        Aggregation.addFields()
            .addField("unitPrice")
            .withValue(ArithmeticOperators.Divide
                .valueOf("totalSales")
                .divideBy("totalQuantity"))
            .build(),
        
        // 阶段7: 最终投影
        Aggregation.project()
            .and("_id.customerId").as("customerId")
            .and("customerInfo.name").as("customerName")
            .and("_id.productId").as("productId")
            .and("productNames").arrayElementAt(0).as("productName")
            .and("totalQuantity").as("quantity")
            .and("totalSales").as("sales")
            .and("unitPrice").as("unitPrice")
            .andExclude("_id"),
        
        // 阶段8: 排序
        Aggregation.sort(Sort.by("customerName", "productName").ascending())
    );
    
    return mongoTemplate.aggregate(
        aggregation, 
        "orders", 
        CustomerReport.class
    ).getMappedResults();
}

5. 聚合表达式

5.1 算术表达式

JAVA
// 使用算术表达式
Aggregation aggregation = Aggregation.newAggregation(
    Aggregation.project()
        .and("price").as("originalPrice")
        .and(ArithmeticOperators.Multiply
            .valueOf("price")
            .multiplyBy(0.8))  // 打8折
            .as("discountedPrice")
        .and(ArithmeticOperators.Subtract
            .valueOf("price")
            .subtract("cost"))
            .as("profitMargin")
        .and.as("totalPrice")  // 使用表达式字符串
);

5.2 字符串表达式

JAVA
// 字符串操作
Aggregation.project()
    .and(StringOperators.Concat
        .valueOf("firstName")
        .concat(" ")
        .concatValueOf("lastName"))
        .as("fullName")
    .and(StringOperators.Substr
        .valueOf("email")
        .substring(0, 5))
        .as("emailPrefix")
    .and(StringOperators.ToLower
        .valueOf("city"))
        .as("cityLowerCase");

5.3 日期表达式

JAVA
// 日期操作
Aggregation.project()
    .and(DateOperators.DateToString
        .dateOf("orderDate")
        .toString("%Y-%m-%d"))
        .as("orderDateString")
    .and(DateOperators.Year
        .yearOf("orderDate"))
        .as("orderYear")
    .and(DateOperators.DayOfMonth
        .dayOfMonth("orderDate"))
        .as("orderDay");

6. 高级特性

6.1 Facet 多面聚合

JAVA
// 同一数据集的不同维度聚合
Aggregation aggregation = Aggregation.newAggregation(
    Aggregation.facet()
        .and(
            Aggregation.match(Criteria.where("status").is("active")),
            Aggregation.count().as("activeCount")
        ).as("activeStats")
        .and(
            Aggregation.match(Criteria.where("amount").gte(1000)),
            Aggregation.group("category").count().as("categoryCount"),
            Aggregation.sort(Sort.Direction.DESC, "categoryCount")
        ).as("bigOrdersByCategory")
        .and(
            Aggregation.project()
                .and").as("year")
                .and").as("month"),
            Aggregation.group("year", "month")
                .sum("amount").as("monthlyTotal")
        ).as("monthlyTotals")
);

AggregationResults<Document> results = mongoTemplate.aggregate(
    aggregation, 
    "orders", 
    Document.class
);

6.2 Graph Lookup 图查找

JAVA
// 查询组织结构
Aggregation aggregation = Aggregation.newAggregation(
    Aggregation.match(Criteria.where("employeeId").is("123")),
    
    Aggregation.graphLookup("employees")
        .startWith("$managerId")
        .connectFrom("managerId")
        .connectTo("employeeId")
        .as("managementChain")
        .maxDepth(5)  // 最大深度
        .depthField("level")
);

// 返回指定员工的所有上级
List<Document> managementChain = results.getMappedResults();

6.3 Bucket 分组桶

JAVA
// 将数据分组到指定范围
Aggregation aggregation = Aggregation.newAggregation(
    Aggregation.bucket("amount")
        .withBoundaries(0, 100, 500, 1000, 5000)  // 价格区间
        .withDefaultBucket("above_5000")           // 默认桶
        .andOutputCount().as("count")
        .andOutput("amount").sum().as("totalAmount")
        .andOutput("amount").avg().as("avgAmount")
);

7. 性能优化

7.1 索引优化

JAVA
// 确保聚合查询使用索引
@CompoundIndex(name = "status_date_idx", def = "{'status': 1, 'orderDate': -1}")
@Document(collection = "orders")
public class Order {
    // ...
}

// 查询时使用索引字段
Aggregation aggregation = Aggregation.newAggregation(
    Aggregation.match(
        Criteria.where("status").is("completed")
               .and("orderDate").gte(startDate)  // 使用复合索引字段
    ),
    // ...
);

7.2 分页聚合

JAVA
// 聚合结果分页
public AggregationPage<OrderSummary> getOrderSummaryPage(int page, int size) {
    Aggregation aggregation = Aggregation.newAggregation(
        Aggregation.match(Criteria.where("status").is("completed")),
        Aggregation.group("category")
            .sum("amount").as("totalAmount")
            .count().as("orderCount"),
        Aggregation.sort(Sort.Direction.DESC, "totalAmount"),
        Aggregation.skip((long) page * size),  // 分页
        Aggregation.limit(size)
    );
    
    AggregationResults<OrderSummary> results = mongoTemplate.aggregate(
        aggregation, 
        "orders", 
        OrderSummary.class
    );
    
    // 获取总数
    long total = mongoTemplate.count(
        Query.query(Criteria.where("status").is("completed")), 
        "orders"
    );
    
    return new AggregationPage<>(
        results.getMappedResults(),
        PageRequest.of(page, size),
        total
    );
}

8. 聚合结果处理

8.1 多种返回类型

JAVA
// 1. 返回 Document
AggregationResults<Document> docResults = mongoTemplate.aggregate(
    aggregation, "orders", Document.class
);
for (Document doc : docResults) {
    System.out.println(doc.toJson());
}

// 2. 返回自定义 DTO
AggregationResults<OrderStats> dtoResults = mongoTemplate.aggregate(
    aggregation, "orders", OrderStats.class
);
List<OrderStats> stats = dtoResults.getMappedResults();

// 3. 返回 Map
AggregationResults<Map> mapResults = mongoTemplate.aggregate(
    aggregation, "orders", Map.class
);

// 4. 返回原始输出
AggregationOutput rawOutput = mongoTemplate.aggregate(
    aggregation, "orders", BasicDBObject.class
);

8.2 处理大数据集

JAVA
// 使用游标处理大量数据
public void processLargeAggregation() {
    Aggregation aggregation = Aggregation.newAggregation(
        Aggregation.match(Criteria.where("status").is("active")),
        Aggregation.group("category").sum("amount").as("total")
    );
    
    // 使用 cursor
    aggregation.withOptions(
        AggregationOptions.builder()
            .cursor(new Document("batchSize", 1000))  // 批次大小
            .allowDiskUse(true)  // 允许使用磁盘
            .build()
    );
    
    try (CloseableIterator<Document> iterator = mongoTemplate
            .aggregateStream(aggregation, "orders", Document.class)) {
        while (iterator.hasNext()) {
            Document doc = iterator.next();
            processDocument(doc);
        }
    }
}

9. 常见聚合模式

9.1 统计报表

JAVA
public SalesReport getSalesReport(DateRange range) {
    Aggregation aggregation = Aggregation.newAggregation(
        Aggregation.match(
            Criteria.where("orderDate")
                .gte(range.getStartDate())
                .lte(range.getEndDate())
        ),
        
        // 按日期统计
        Aggregation.facet()
            .and(
                Aggregation.project()
                    .and(DateOperators.DateToString
                        .dateOf("orderDate")
                        .toString("%Y-%m-%d")).as("date"),
                Aggregation.group("date")
                    .sum("amount").as("dailyTotal")
                    .count().as("orderCount"),
                Aggregation.sort(Sort.by("date").ascending())
            ).as("dailySales")
            
            // 按产品统计
            .and(
                Aggregation.unwind("items"),
                Aggregation.group("items.productId", "items.productName")
                    .sum("items.quantity").as("totalQuantity")
                    .sum("items.subtotal").as("totalSales"),
                Aggregation.sort(Sort.by("totalSales").descending()),
                Aggregation.limit(10)
            ).as("topProducts")
            
            // 按地区统计
            .and(
                Aggregation.lookup("customers", "customerId", "_id", "customer"),
                Aggregation.unwind("customer"),
                Aggregation.group("customer.region")
                    .sum("amount").as("regionTotal")
                    .count().as("orderCount"),
                Aggregation.sort(Sort.by("regionTotal").descending())
            ).as("salesByRegion")
    );
    
    AggregationResults<Document> results = mongoTemplate.aggregate(
        aggregation, 
        "orders", 
        Document.class
    );
    
    return mapToSalesReport(results.getUniqueMappedResult());
}

9.2 数据清洗

JAVA
public void cleanAndTransformData() {
    Aggregation aggregation = Aggregation.newAggregation(
        // 1. 过滤无效数据
        Aggregation.match(
            Criteria.where("amount").gt(0)
                   .and("customerId").exists(true)
        ),
        
        // 2. 标准化字段
        Aggregation.addFields()
            .addField("normalizedEmail")
            .withValue(StringOperators.ToLower.valueOf("email"))
            .build()
            .addField("category")
            .withValue(ConditionalOperators.ifNull("category").then("uncategorized"))
            .build(),
        
        // 3. 计算衍生字段
        Aggregation.addFields()
            .addField("profit")
            .withValue(ArithmeticOperators.Subtract
                .valueOf("amount")
                .subtract("cost"))
            .build()
            .addField("profitMargin")
            .withValue(ArithmeticOperators.Multiply
                .valueOf(
                    ArithmeticOperators.Divide
                        .valueOf("profit")
                        .divideBy("amount")
                )
                .multiplyBy(100))
            .build(),
        
        // 4. 写入新集合
        Aggregation.out("cleaned_orders")
    );
    
    mongoTemplate.aggregate(aggregation, "raw_orders", Document.class);
}

10. 调试和优化

10.1 查看生成的聚合管道

JAVA
// 打印聚合管道
Aggregation aggregation = ...;
String pipeline = aggregation.toString();
System.out.println("Aggregation Pipeline: " + pipeline);

// 或者获取 MongoDB 原生格式
List<Document> pipelineDoc = aggregation.toPipeline(Aggregation.DEFAULT_CONTEXT);
System.out.println("MongoDB Pipeline: " + pipelineDoc);

// 使用 explain
AggregationResults<Document> results = mongoTemplate.aggregate(
    aggregation.withOptions(
        AggregationOptions.builder()
            .explain(true)  // 返回执行计划
            .build()
    ),
    "orders",
    Document.class
);

10.2 监控聚合性能

JAVA
@Repository
public class OrderRepository {
    
    @Autowired
    private MongoTemplate mongoTemplate;
    
    public AggregationResults<Document> aggregateWithMetrics(Aggregation aggregation) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        
        try {
            return mongoTemplate.aggregate(aggregation, "orders", Document.class);
        } finally {
            stopWatch.stop();
            log.info("Aggregation executed in {} ms", stopWatch.getTotalTimeMillis());
        }
    }
}

11. 实际应用场景

11.1 电商分析

JAVA
public ECommerceAnalytics getAnalytics(LocalDate start, LocalDate end) {
    Aggregation aggregation = Aggregation.newAggregation(
        Aggregation.match(
            Criteria.where("orderDate").gte(start).lte(end)
        ),
        
        Aggregation.facet()
            // 总体统计
            .and(
                Aggregation.group()
                    .sum("amount").as("totalRevenue")
                    .count().as("totalOrders")
                    .addToSet("customerId").as("uniqueCustomers"),
                Aggregation.project()
                    .and("totalRevenue").as("revenue")
                    .and("totalOrders").as("orders")
                    .and(ArrayOperators.Size.lengthOfArray("uniqueCustomers")).as("customers")
            ).as("overview")
            
            // 转化率漏斗
            .and(
                Aggregation.group("status")
                    .count().as("count"),
                Aggregation.sort(Sort.by("count").descending())
            ).as("funnel")
            
            // 客户生命周期价值
            .and(
                Aggregation.group("customerId")
                    .sum("amount").as("lifetimeValue")
                    .count().as("orderCount")
                    .min("orderDate").as("firstOrderDate")
                    .max("orderDate").as("lastOrderDate"),
                Aggregation.bucketAuto("lifetimeValue", 5)  // 自动分成5个桶
            ).as("clvSegments")
    );
    
    return mongoTemplate.aggregate(
        aggregation, 
        "orders", 
        ECommerceAnalytics.class
    ).getUniqueMappedResult();
}

总结

mongoTemplate.aggregate 是 MongoDB 中最强大的数据分析工具,支持:

  1. 复杂数据处理:多阶段管道处理
  2. 丰富聚合阶段:match、group、project、lookup 等
  3. 多种表达式:算术、字符串、日期、条件表达式
  4. 高级特性:Facet、Graph Lookup、Bucket
  5. 性能优化:索引、分页、游标
  6. 灵活输出:Document、DTO、Map 等

使用建议

  • 复杂业务逻辑优先考虑聚合而不是多次查询
  • 使用索引优化 match 阶段
  • 分阶段测试聚合管道
  • 监控聚合性能
  • 考虑使用 allowDiskUse(true) 处理大数据集

全部评论: 0

    我有话说: