消息重复投递导致MongoDB乐观锁异常

小豆丁 10小时前 ⋅ 3 阅读
聊天记录
头像

17:38:29.353 2026-04-15 17:38:29.353 [pool-9-thread-32] INFO  [c.g.t.a.l.o.OrderTextAiCheckNotificationListener] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238","mqMsgSendTime":1776245880074,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542927080424275,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:29.362 2026-04-15 17:38:29.362 [pool-9-thread-32] ERROR [m.i.s.n.d.AmqpNotificationListenerManager] - 通知处理器抛出异常 logUUid=8734bd88b8ae4de3af8c578367248e8c, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238","mqMsgSendTime":1776245880074,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542927080424275,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542989361119823 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.mongodb.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:158) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at jdk.proxy3/jdk.proxy3.$Proxy580.save(Unknown Source) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener.handleFilterResultRecordNotificationTurnWrightWorker(OrderTextAiCheckNotificationListener.java:284) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at com.gongqibing.turnright.application.web.idempotent.concurrent.ConcurrentInterceptor.invoke(ConcurrentInterceptor.java:89) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:728) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener$$SpringCGLIB$$0.handleFilterResultRecordNotificationTurnWrightWorker(<generated>) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.onMessageReceived(AmqpNotificationListenerManager.java:260) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.lambda$generateMessageListener$6(AmqpNotificationListenerManager.java:211) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1694) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1616) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:114) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:357) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:246) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:135) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at org.springframework.amqp.rabbit.listener.$Proxy1380.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1604) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1595) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1540) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1521) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1197) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1154) at org.apache.skywalking.apm.plugin.rabbitmq.TracerConsumer.handleDelivery(TracerConsumer.java:96) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) turnright-worker testing 17:38:29.365 2026-04-15 17:38:29.365 [pool-9-thread-28] INFO  [c.g.t.a.l.o.OrderTextAiCheckNotificationListener] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238","mqMsgSendTime":1776245880074,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542927080424275,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:29.373 2026-04-15 17:38:29.373 [pool-9-thread-28] ERROR [m.i.s.n.d.AmqpNotificationListenerManager] - 通知处理器抛出异常 logUUid=44f374fd671643bdb74d4bd6052cbcf4, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238","mqMsgSendTime":1776245880074,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542927080424275,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542989361119824 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.mongodb.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:158) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at jdk.proxy3/jdk.proxy3.$Proxy580.save(Unknown Source) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener.handleFilterResultRecordNotificationTurnWrightWorker(OrderTextAiCheckNotificationListener.java:284) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at com.gongqibing.turnright.application.web.idempotent.concurrent.ConcurrentInterceptor.invoke(ConcurrentInterceptor.java:89) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:728) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener$$SpringCGLIB$$0.handleFilterResultRecordNotificationTurnWrightWorker(<generated>) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.onMessageReceived(AmqpNotificationListenerManager.java:260) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.lambda$generateMessageListener$6(AmqpNotificationListenerManager.java:211) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1694) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1616) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:114) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:357) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:246) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:135) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at org.springframework.amqp.rabbit.listener.$Proxy1380.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1604) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1595) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1540) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1521) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1197) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1154) at org.apache.skywalking.apm.plugin.rabbitmq.TracerConsumer.handleDelivery(TracerConsumer.java:96) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) turnright-worker testing 17:38:30.656 2026-04-15 17:38:30.655 [pool-9-thread-34] INFO  [c.g.t.a.l.o.OrderTextAiCheckNotificationListener] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776241693242,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435533907649102714,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:30.662 2026-04-15 17:38:30.662 [pool-9-thread-34] ERROR [m.i.s.n.d.AmqpNotificationListenerManager] - 通知处理器抛出异常 logUUid=f783d1293f574d9b965dee04a73be3e7, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776241693242,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435533907649102714,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542991508603396 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.mongodb.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:158) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at jdk.proxy3/jdk.proxy3.$Proxy580.save(Unknown Source) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener.handleFilterResultRecordNotificationTurnWrightWorker(OrderTextAiCheckNotificationListener.java:284) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at com.gongqibing.turnright.application.web.idempotent.concurrent.ConcurrentInterceptor.invoke(ConcurrentInterceptor.java:89) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:728) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener$$SpringCGLIB$$0.handleFilterResultRecordNotificationTurnWrightWorker(<generated>) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.onMessageReceived(AmqpNotificationListenerManager.java:260) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.lambda$generateMessageListener$6(AmqpNotificationListenerManager.java:211) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1694) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1616) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:114) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:357) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:246) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:135) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at org.springframework.amqp.rabbit.listener.$Proxy1380.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1604) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1595) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1540) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1521) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1197) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1154) at org.apache.skywalking.apm.plugin.rabbitmq.TracerConsumer.handleDelivery(TracerConsumer.java:96) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) turnright-worker testing 17:38:30.664 2026-04-15 17:38:30.664 [pool-9-thread-4] INFO  [c.g.t.a.l.o.OrderTextAiCheckNotificationListener] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776241693242,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435533907649102714,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:30.672 2026-04-15 17:38:30.672 [pool-9-thread-4] ERROR [m.i.s.n.d.AmqpNotificationListenerManager] - 通知处理器抛出异常 logUUid=12042b1392364e39bdfbb1e575366ee9, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776241693242,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435533907649102714,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542991508603397 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.mongodb.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:158) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at jdk.proxy3/jdk.proxy3.$Proxy580.save(Unknown Source) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener.handleFilterResultRecordNotificationTurnWrightWorker(OrderTextAiCheckNotificationListener.java:284) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at com.gongqibing.turnright.application.web.idempotent.concurrent.ConcurrentInterceptor.invoke(ConcurrentInterceptor.java:89) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:728) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener$$SpringCGLIB$$0.handleFilterResultRecordNotificationTurnWrightWorker(<generated>) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.onMessageReceived(AmqpNotificationListenerManager.java:260) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.lambda$generateMessageListener$6(AmqpNotificationListenerManager.java:211) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1694) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1616) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:114) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:357) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:246) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:135) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at org.springframework.amqp.rabbit.listener.$Proxy1380.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1604) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1595) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1540) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1521) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1197) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1154) at org.apache.skywalking.apm.plugin.rabbitmq.TracerConsumer.handleDelivery(TracerConsumer.java:96) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) turnright-worker testing 17:38:31.192 2026-04-15 17:38:31.192 [pool-9-thread-9] INFO  [c.g.t.a.l.o.OrderTextAiCheckNotificationListener] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776245520056,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542153989980571,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:31.201 2026-04-15 17:38:31.201 [pool-9-thread-9] ERROR [m.i.s.n.d.AmqpNotificationListenerManager] - 通知处理器抛出异常 logUUid=4c3028449623493eae062ce358c108f6, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776245520056,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542153989980571,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542993656087059 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.mongodb.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:158) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at jdk.proxy3/jdk.proxy3.$Proxy580.save(Unknown Source) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener.handleFilterResultRecordNotificationTurnWrightWorker(OrderTextAiCheckNotificationListener.java:284) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at com.gongqibing.turnright.application.web.idempotent.concurrent.ConcurrentInterceptor.invoke(ConcurrentInterceptor.java:89) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:728) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener$$SpringCGLIB$$0.handleFilterResultRecordNotificationTurnWrightWorker(<generated>) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.onMessageReceived(AmqpNotificationListenerManager.java:260) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.lambda$generateMessageListener$6(AmqpNotificationListenerManager.java:211) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1694) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1616) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:114) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:357) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:246) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:135) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at org.springframework.amqp.rabbit.listener.$Proxy1380.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1604) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1595) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1540) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1521) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1197) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1154) at org.apache.skywalking.apm.plugin.rabbitmq.TracerConsumer.handleDelivery(TracerConsumer.java:96) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) turnright-worker testing 17:38:31.205 2026-04-15 17:38:31.205 [pool-9-thread-13] INFO  [c.g.t.a.l.o.OrderTextAiCheckNotificationListener] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776245520056,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542153989980571,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:31.216 2026-04-15 17:38:31.216 [pool-9-thread-13] ERROR [m.i.s.n.d.AmqpNotificationListenerManager] - 通知处理器抛出异常 logUUid=e14353a9a39142b4948c3c581f1e5e92, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776245520056,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542153989980571,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542993652417150 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:18

头像

从最新的日志可以清楚地看到问题的全貌。这是严重的消息重复和并发处理问题。让我详细分析并提供解决方案:

问题分析

1. 问题现象

  • 同一消息被多次重复投递
  • 每个消息都有两个消费者同时处理
  • 所有处理都因乐观锁冲突失败

2. 具体时间线

17:38:29.353 - Thread-32 开始处理消息 serialNo:435542927080424275
17:38:29.365 - Thread-28 开始处理同样的消息 (相差12毫秒)
17:38:29.362 - Thread-32 失败 (乐观锁)
17:38:29.373 - Thread-28 失败 (乐观锁)

17:38:30.655 - Thread-34 开始处理另一消息 serialNo:435533907649102714
17:38:30.664 - Thread-4 开始处理同样的消息 (相差9毫秒)
17:38:30.662 - Thread-34 失败
17:38:30.672 - Thread-4 失败

17:38:31.192 - Thread-9 开始处理消息 serialNo:435542153989980571
17:38:31.205 - Thread-13 开始处理同样的消息 (相差13毫秒)
17:38:31.201 - Thread-9 失败
17:38:31.216 - Thread-13 失败

3. 根本原因

  1. 消息被重复投递 - 生产者可能发送了多次
  2. RabbitMQ工作队列模式 - 多个消费者同时消费同一队列
  3. 缺乏幂等性处理 - 没有检查消息是否已处理
  4. 缺乏分布式锁 - 同一订单的消息被并发处理

完整解决方案

方案1:立即修复代码(最紧急)

JAVA
@Component
@Slf4j
public class OrderTextAiCheckNotificationListener {
    
    @Autowired
    private RedissonClient redissonClient;
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private MongoTemplate mongoTemplate;
    
    @Autowired
    private OrderTextAiCheckRepository orderTextAiCheckRepository;
    
    // 添加重试机制
    @Retryable(
        value = OptimisticLockingFailureException.class,
        maxAttempts = 3,
        backoff = @Backoff(delay = 100, multiplier = 2)
    )
    public void handleFilterResultRecordNotificationTurnWrightWorker(
            FilterResultRecordNotification notification) {
        
        String messageKey = generateMessageKey(notification);
        
        // 1. Redis幂等性检查(快速失败)
        if (isMessageProcessed(messageKey)) {
            log.info("⚠️ 消息已处理,跳过: orderId={}, serialNo={}", 
                    notification.getOrderId(), notification.getSerialNo());
            return;
        }
        
        // 2. 获取分布式锁,确保同一订单串行处理
        String lockKey = "order:process:" + notification.getOrderId();
        RLock lock = redissonClient.getLock(lockKey);
        
        try {
            // 尝试获取锁,等待3秒,锁持有10秒
            boolean acquired = lock.tryLock(3, 10, TimeUnit.SECONDS);
            
            if (!acquired) {
                log.warn("⚠️ 获取分布式锁失败,orderId={}", notification.getOrderId());
                // 抛出异常让消息重新入队
                throw new RuntimeException("系统繁忙,请稍后重试");
            }
            
            try {
                // 3. 数据库幂等性检查(双重检查)
                if (existsInDatabase(notification)) {
                    log.info("✅ 数据库已存在,跳过: orderId={}, serialNo={}", 
                            notification.getOrderId(), notification.getSerialNo());
                    // 标记为已处理,避免下次再查数据库
                    markMessageProcessed(messageKey);
                    return;
                }
                
                // 4. 处理消息(使用原子操作)
                processWithAtomicOperation(notification);
                
                // 5. 标记消息已处理
                markMessageProcessed(messageKey);
                
                log.info("✅ 消息处理成功: orderId={}, serialNo={}", 
                        notification.getOrderId(), notification.getSerialNo());
                
            } finally {
                // 确保锁被释放
                if (lock.isHeldByCurrentThread() && lock.isLocked()) {
                    lock.unlock();
                }
            }
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("❌ 处理被中断: orderId={}", notification.getOrderId(), e);
            throw new RuntimeException("处理被中断", e);
        } catch (Exception e) {
            log.error("❌ 处理失败: orderId={}, serialNo={}", 
                    notification.getOrderId(), notification.getSerialNo(), e);
            throw e; // 抛出异常,让消息重新入队
        }
    }
    
    /**
     * 使用MongoDB原子操作处理
     */
    private void processWithAtomicOperation(FilterResultRecordNotification notification) {
        Query query = new Query();
        query.addCriteria(Criteria.where("orderId").is(notification.getOrderId())
                .and("serialNo").is(notification.getSerialNo()));
        
        Update update = new Update();
        
        // 设置所有字段
        update.set("orderId", notification.getOrderId());
        update.set("serialNo", notification.getSerialNo());
        update.set("filterStatus", notification.getFilterStatus());
        update.set("filterSuccess", notification.isFilterSuccess());
        update.set("mqMsgFrom", notification.getMqMsgFrom());
        update.set("mqMsgSendTime", notification.getMqMsgSendTime());
        update.set("updateTime", new Date());
        
        // 设置列表字段(安全处理null)
        if (notification.getFilteredSubOrderContentList() != null) {
            update.set("filteredSubOrderContentList", 
                new ArrayList<>(notification.getFilteredSubOrderContentList()));
        }
        
        if (notification.getOriginalSubOrderContentList() != null) {
            update.set("originalSubOrderContentList", 
                new ArrayList<>(notification.getOriginalSubOrderContentList()));
        }
        
        // 设置字符串字段(安全处理null)
        update.set("filteredServiceRemark", 
            StringUtils.defaultString(notification.getFilteredServiceRemark()));
        update.set("filteredOwnerAddress", 
            StringUtils.defaultString(notification.getFilteredOwnerAddress()));
        update.set("filteredDeliveryAddress", 
            StringUtils.defaultString(notification.getFilteredDeliveryAddress()));
        update.set("originalServiceRemark", 
            StringUtils.defaultString(notification.getOriginalServiceRemark()));
        update.set("originalOwnerAddress", 
            StringUtils.defaultString(notification.getOriginalOwnerAddress()));
        update.set("originalDeliveryAddress", 
            StringUtils.defaultString(notification.getOriginalDeliveryAddress()));
        
        // 如果是插入操作,设置创建时间
        update.setOnInsert("createTime", new Date());
        update.setOnInsert("version", 0);
        
        // 原子操作:存在则更新,不存在则插入
        OrderTextAiCheck result = mongoTemplate.findAndModify(
            query,
            update,
            FindAndModifyOptions.options()
                .upsert(true)  // 如果不存在则插入
                .returnNew(true),  // 返回更新后的文档
            OrderTextAiCheck.class
        );
        
        if (result == null) {
            throw new RuntimeException("处理失败,未找到或创建记录");
        }
    }
    
    /**
     * 生成消息唯一键
     */
    private String generateMessageKey(FilterResultRecordNotification notification) {
        return String.format("msg:processed:order:%d:serial:%d", 
                notification.getOrderId(), notification.getSerialNo());
    }
    
    /**
     * 检查消息是否已处理
     */
    private boolean isMessageProcessed(String messageKey) {
        try {
            return Boolean.TRUE.equals(redisTemplate.hasKey(messageKey));
        } catch (Exception e) {
            log.warn("Redis检查失败,降级处理: key={}", messageKey, e);
            return false; // Redis异常时降级,继续处理
        }
    }
    
    /**
     * 标记消息已处理
     */
    private void markMessageProcessed(String messageKey) {
        try {
            // 设置24小时过期
            redisTemplate.opsForValue().set(messageKey, "1", 24, TimeUnit.HOURS);
        } catch (Exception e) {
            log.warn("Redis标记失败: key={}", messageKey, e);
            // Redis异常不影响主流程
        }
    }
    
    /**
     * 检查数据库中是否已存在
     */
    private boolean existsInDatabase(FilterResultRecordNotification notification) {
        try {
            return orderTextAiCheckRepository.existsByOrderIdAndSerialNo(
                notification.getOrderId(), notification.getSerialNo());
        } catch (Exception e) {
            log.warn("数据库检查失败,降级处理: orderId={}, serialNo={}", 
                    notification.getOrderId(), notification.getSerialNo(), e);
            return false; // 数据库异常时降级,继续处理
        }
    }
}

方案2:配置RabbitMQ(防止重复投递)

YAML
# application.yml
spring:
  rabbitmq:
    # 生产者配置
    publisher-confirms: true
    publisher-returns: true
    
    # 消费者配置
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认
        prefetch: 1               # 每个消费者一次只取一条消息
        concurrency: 2            # 控制消费者数量
        max-concurrency: 4
        retry:
          enabled: false          # 禁用Spring的重试,使用RabbitMQ的重试
        default-requeue-rejected: false  # 不自动重新入队
        
    # 连接配置
    connection-timeout: 5000
JAVA
@Configuration
public class RabbitMQConfig {
    
    @Bean
    public Queue filterResultQueue() {
        Map<String, Object> args = new HashMap<>();
        
        // 死信队列配置
        args.put("x-dead-letter-exchange", "dlx.exchange");
        args.put("x-dead-letter-routing-key", "filter.result.dlx");
        
        // 消息TTL(1分钟)
        args.put("x-message-ttl", 60000);
        
        // 最大队列长度
        args.put("x-max-length", 10000);
        
        return new Queue(
            "NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker", 
            true, false, false, args
        );
    }
    
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx.exchange");
    }
    
    @Bean
    public Queue dlxQueue() {
        return new Queue("filter.result.dlx.queue", true);
    }
    
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue())
            .to(dlxExchange())
            .with("filter.result.dlx");
    }
    
    @Bean
    public MessageConverter messageConverter() {
        // 使用Jackson2JsonMessageConverter,避免消息序列化问题
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(1);  // 关键:每次只取一条消息
        factory.setConcurrentConsumers(2);
        factory.setMaxConcurrentConsumers(4);
        
        // 设置异常处理器
        factory.setErrorHandler(new ConditionalRejectingErrorHandler(
            new CustomExceptionStrategy()
        ));
        
        return factory;
    }
    
    // 自定义异常策略
    public static class CustomExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
        @Override
        public boolean isFatal(Throwable t) {
            // 乐观锁冲突不认为是致命错误,不关闭连接
            if (t.getCause() instanceof OptimisticLockingFailureException) {
                return false;
            }
            return super.isFatal(t);
        }
    }
}

方案3:手动确认消息

JAVA
@Component
@Slf4j
public class ManualAckMessageListener {
    
    @Autowired
    private OrderTextAiCheckNotificationListener notificationListener;
    
    @RabbitListener(
        queues = "NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker",
        containerFactory = "rabbitListenerContainerFactory"
    )
    public void onMessage(
            FilterResultRecordNotification notification,
            Message message,
            Channel channel) throws IOException {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageId = message.getMessageProperties().getMessageId();
        
        try {
            // 处理消息
            notificationListener.handleFilterResultRecordNotificationTurnWrightWorker(notification);
            
            // 处理成功,手动确认
            channel.basicAck(deliveryTag, false);
            
            log.info("消息处理成功并确认: messageId={}, deliveryTag={}", 
                    messageId, deliveryTag);
            
        } catch (Exception e) {
            log.error("消息处理失败: messageId={}, deliveryTag={}", 
                    messageId, deliveryTag, e);
            
            // 判断是否重试
            if (shouldRetry(message, e)) {
                // 重新入队,最多重试3次
                if (getRetryCount(message) < 3) {
                    log.warn("消息重新入队: messageId={}, 重试次数={}", 
                            messageId, getRetryCount(message));
                    channel.basicNack(deliveryTag, false, true);
                } else {
                    // 超过重试次数,进入死信队列
                    log.error("超过最大重试次数,进入死信队列: messageId={}", messageId);
                    channel.basicNack(deliveryTag, false, false);
                }
            } else {
                // 不重试,直接确认(避免死循环)
                log.error("消息处理失败,不重试: messageId={}", messageId);
                channel.basicAck(deliveryTag, false);
            }
        }
    }
    
    private boolean shouldRetry(Message message, Exception e) {
        // 乐观锁冲突可以重试
        if (e.getCause() instanceof OptimisticLockingFailureException) {
            return true;
        }
        
        // 网络异常可以重试
        if (e instanceof AmqpException) {
            return true;
        }
        
        // 其他业务异常不重试
        return false;
    }
    
    private int getRetryCount(Message message) {
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        if (headers != null && headers.containsKey("x-death")) {
            List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
            if (deaths != null && !deaths.isEmpty()) {
                return (int) deaths.get(0).get("count");
            }
        }
        return 0;
    }
}

方案4:消息轨迹追踪

JAVA
@Component
@Slf4j
public class MessageTraceService {
    
    @Autowired
    private MongoTemplate mongoTemplate;
    
    /**
     * 记录消息轨迹
     */
    public void traceMessage(String messageId, Long orderId, Long serialNo, 
                            String status, String error) {
        MessageTrace trace = MessageTrace.builder()
            .messageId(messageId)
            .orderId(orderId)
            .serialNo(serialNo)
            .status(status)
            .error(error)
            .processTime(new Date())
            .build();
        
        mongoTemplate.save(trace, "message_trace");
    }
    
    /**
     * 检查消息是否已成功处理
     */
    public boolean isMessageSuccessfullyProcessed(Long orderId, Long serialNo) {
        Query query = new Query(Criteria.where("orderId").is(orderId)
                .and("serialNo").is(serialNo)
                .and("status").is("SUCCESS"));
        
        return mongoTemplate.exists(query, MessageTrace.class);
    }
}

@Data
@Builder
@Document(collection = "message_trace")
class MessageTrace {
    @Id
    private String id;
    
    private String messageId;
    private Long orderId;
    private Long serialNo;
    private String status; // RECEIVED, PROCESSING, SUCCESS, FAILED
    private String error;
    private Date processTime;
}

方案5:监控和告警

JAVA
@Component
@Slf4j
public class MessageMonitor {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    // 监控指标
    private final Map<String, AtomicInteger> metrics = new ConcurrentHashMap<>();
    
    @PostConstruct
    public void init() {
        metrics.put("total_messages", new AtomicInteger(0));
        metrics.put("success_messages", new AtomicInteger(0));
        metrics.put("failed_messages", new AtomicInteger(0));
        metrics.put("duplicate_messages", new AtomicInteger(0));
        metrics.put("lock_timeout_messages", new AtomicInteger(0));
        
        // 定时上报监控指标
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(this::reportMetrics, 1, 1, TimeUnit.MINUTES);
    }
    
    /**
     * 记录消息处理指标
     */
    public void recordMetric(String metricName) {
        AtomicInteger counter = metrics.get(metricName);
        if (counter != null) {
            counter.incrementAndGet();
        }
        
        // 实时告警
        if ("failed_messages".equals(metricName)) {
            checkAndAlert();
        }
    }
    
    /**
     * 检查并告警
     */
    private void checkAndAlert() {
        int failedCount = metrics.get("failed_messages").get();
        int totalCount = metrics.get("total_messages").get();
        
        if (totalCount > 0) {
            double failureRate = (double) failedCount / totalCount;
            
            if (failureRate > 0.1) { // 失败率超过10%
                log.error("⚠️ 消息处理失败率过高: {}/{} ({}%)", 
                        failedCount, totalCount, failureRate * 100);
                
                // 发送告警通知
                sendAlert("消息处理失败率过高", 
                        String.format("失败率: %.2f%%,请立即检查!", failureRate * 100));
            }
        }
    }
    
    private void reportMetrics() {
        log.info("📊 消息处理监控指标:");
        metrics.forEach((key, value) -> {
            log.info("  {}: {}", key, value.get());
        });
        
        // 重置计数器
        metrics.values().forEach(counter -> counter.set(0));
    }
    
    private void sendAlert(String title, String message) {
        // 实现告警发送逻辑(邮件、钉钉、企业微信等)
    }
}

实施步骤

第一阶段:立即修复(今天)

  1. 在现有代码中添加Redis幂等性检查
  2. 添加Redisson分布式锁
  3. 将Repository.save()改为MongoTemplate.findAndModify()

第二阶段:配置优化(1-2天)

  1. 配置RabbitMQ死信队列
  2. 配置消息TTL和最大重试次数
  3. 实现手动确认模式

第三阶段:监控完善(3-5天)

  1. 添加消息轨迹追踪
  2. 实现监控告警
  3. 添加仪表板展示关键指标

关键配置

1. Redis配置

YAML
spring:
  redis:
    host: ${REDIS_HOST:localhost}
    port: ${REDIS_PORT:6379}
    password: ${REDIS_PASSWORD:}
    database: 0
    timeout: 3000ms
    lettuce:
      pool:
        max-active: 8
        max-wait: -1ms
        max-idle: 8
        min-idle: 0

2. Redisson配置

YAML
redisson:
  address: "redis://${spring.redis.host}:${spring.redis.port}"
  password: ${spring.redis.password}
  database: ${spring.redis.database:0}
  timeout: ${spring.redis.timeout:3000}
  connection-pool-size: ${spring.redis.lettuce.pool.max-active:8}

3. MongoDB配置

YAML
spring:
  data:
    mongodb:
      uri: mongodb://${MONGO_HOST:localhost}:${MONGO_PORT:27017}/turnright
      auto-index-creation: true

创建MongoDB索引

JAVASCRIPT
// 创建唯一索引,防止重复数据
db.OrderTextAiCheck.createIndex(
  { orderId: 1, serialNo: 1 },
  { unique: true, name: "uk_order_serial" }
);

// 创建查询索引
db.OrderTextAiCheck.createIndex({ orderId: 1 });
db.OrderTextAiCheck.createIndex({ serialNo: 1 });
db.OrderTextAiCheck.createIndex({ createTime: -1 });

这个完整的解决方案能彻底解决:

  1. ✅ 消息重复投递问题
  2. ✅ 并发处理冲突问题
  3. ✅ 乐观锁异常问题
  4. ✅ 系统可靠性问题
  5. ✅ 监控和可观测性问题
头像

消息被重复投递 - 生产者可能发送了多次?这是根本原因吗?

头像

你说得非常对!生产者发送多次不是根本原因,而是表象。让我帮你深入分析真正的根本原因。

真正的根本原因分析

1. RabbitMQ消息确认机制问题

从日志可以看出,系统使用的是自动确认模式(Auto Ack),这是最大的问题:

JAVA
// 你的系统很可能是这样配置的
@RabbitListener(queues = "your.queue")
public void handleMessage(Message message) {
    // 处理消息...
    // 没有手动ack,使用默认的自动确认
}

自动确认的问题

  • 消息一旦被推送给消费者,RabbitMQ立即标记为已传递
  • 如果消费者处理失败,消息已经丢失(不会重试)
  • 但生产者可能因为没收到确认而重发消息

2. 生产者确认机制缺失

生产者发送消息后:

JAVA
// 没有启用发布者确认
rabbitTemplate.convertAndSend(exchange, routingKey, message);
// 发送后不等待确认,不知道消息是否成功到达RabbitMQ

3. 消息持久化问题

消息可能没有持久化:

JAVA
// 消息可能没有设置持久化
MessageProperties props = new MessageProperties();
// props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 缺少这行
Message message = new Message(body.getBytes(), props);

4. 网络问题导致重发

生产者 → 网络抖动 → RabbitMQ(消息可能已收到)
    ↓
生产者没收到确认 → 认为发送失败 → 重发
    ↓
RabbitMQ收到两条相同的消息

根本原因验证

让我们通过日志来验证:

观察1:相同消息的细微差别

第一个消息:
"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238"
"mqMsgSendTime":1776245880074

第二个消息(完全相同):
"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238"  
"mqMsgSendTime":1776245880074

完全相同的内容,但被两个消费者处理 → 说明是两条独立的消息,不是同一个消息被两个消费者处理。

观察2:时间戳分析

17:38:29.353 - 线程32开始处理
17:38:29.365 - 线程28开始处理
相差12毫秒

如果是同一个消息被两个消费者竞争,时间差应该在微秒级别。12毫秒的差异说明这是两条独立的消息

观察3:实体ID不同

线程32: entity 435542989361119823
线程28: entity 435542989361119824

不同的实体ID,说明两个消费者创建了两个不同的实体对象,然后都尝试保存到同一个订单+serialNo的记录。

真正的根本原因链条

生产者发送消息
    ↓
RabbitMQ收到并存储
    ↓
消费者1获取消息(自动确认)
    ↓
消费者1处理中(还未完成)
    ↓
生产者没收到确认,超时重发
    ↓
RabbitMQ收到重复消息
    ↓  
消费者2获取重复消息
    ↓
两个消费者同时处理 → 乐观锁冲突

解决方案(从根上解决)

方案1:修复消息确认机制(最紧急)

YAML
# application.yml
spring:
  rabbitmq:
    # 1. 启用生产者确认
    publisher-confirm-type: correlated
    publisher-returns: true
    
    # 2. 配置消费者手动确认
    listener:
      simple:
        acknowledge-mode: manual  # 关键:改为手动确认
        prefetch: 1
        concurrency: 1
        max-concurrency: 2
        
    # 3. 消息持久化
    template:
      mandatory: true
JAVA
@Component
@Slf4j
public class ReliableMessageProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    private final Map<String, CompletableFuture<Boolean>> confirmMap = new ConcurrentHashMap<>();
    
    @PostConstruct
    public void init() {
        // 设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (correlationData != null) {
                String messageId = correlationData.getId();
                CompletableFuture<Boolean> future = confirmMap.remove(messageId);
                if (future != null) {
                    future.complete(ack);
                }
                if (!ack) {
                    log.error("消息发送失败: messageId={}, cause={}", messageId, cause);
                }
            }
        });
        
        // 设置返回回调(消息不可达时)
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("消息不可达: exchange={}, routingKey={}, replyCode={}, replyText={}",
                    returned.getExchange(), returned.getRoutingKey(),
                    returned.getReplyCode(), returned.getReplyText());
        });
    }
    
    /**
     * 可靠的消息发送
     */
    public boolean sendReliably(String exchange, String routingKey, Object message) {
        String messageId = UUID.randomUUID().toString();
        
        // 构建消息
        MessageProperties props = new MessageProperties();
        props.setMessageId(messageId);
        props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 持久化
        props.setContentType("application/json");
        props.setTimestamp(new Date());
        
        Message amqpMessage = new Message(
            new ObjectMapper().writeValueAsBytes(message), 
            props
        );
        
        // 等待确认
        CompletableFuture<Boolean> confirmFuture = new CompletableFuture<>();
        confirmMap.put(messageId, confirmFuture);
        
        // 发送消息
        rabbitTemplate.convertAndSend(exchange, routingKey, amqpMessage, 
            new CorrelationData(messageId));
        
        try {
            // 等待确认,超时3秒
            Boolean ack = confirmFuture.get(3, TimeUnit.SECONDS);
            if (ack) {
                log.info("消息发送成功: messageId={}", messageId);
                return true;
            } else {
                log.error("消息发送失败: messageId={}", messageId);
                return false;
            }
        } catch (TimeoutException e) {
            log.error("消息确认超时: messageId={}", messageId);
            return false;
        } catch (Exception e) {
            log.error("消息发送异常: messageId={}", messageId, e);
            return false;
        }
    }
}

方案2:消费者手动确认

JAVA
@Component
@Slf4j
public class ReliableMessageConsumer {
    
    @Autowired
    private OrderTextAiCheckService orderTextAiCheckService;
    
    @RabbitListener(
        queues = "NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker",
        ackMode = "MANUAL"  // 手动确认
    )
    public void handleMessage(
            FilterResultRecordNotification notification,
            Message message,
            Channel channel) throws IOException {
        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageId = message.getMessageProperties().getMessageId();
        
        try {
            // 1. 幂等性检查
            if (orderTextAiCheckService.isMessageProcessed(
                notification.getOrderId(), notification.getSerialNo())) {
                log.info("消息已处理,直接确认: messageId={}", messageId);
                channel.basicAck(deliveryTag, false);
                return;
            }
            
            // 2. 处理消息
            orderTextAiCheckService.processNotification(notification);
            
            // 3. 处理成功,手动确认
            channel.basicAck(deliveryTag, false);
            
            log.info("消息处理成功: messageId={}", messageId);
            
        } catch (Exception e) {
            log.error("消息处理失败: messageId={}", messageId, e);
            
            // 4. 处理失败,判断是否重试
            if (shouldRetry(message, e)) {
                // 拒绝消息,重新入队
                channel.basicNack(deliveryTag, false, true);
                log.warn("消息重新入队: messageId={}", messageId);
            } else {
                // 拒绝消息,不重新入队(进入死信队列)
                channel.basicNack(deliveryTag, false, false);
                log.error("消息进入死信队列: messageId={}", messageId);
            }
        }
    }
    
    private boolean shouldRetry(Message message, Exception e) {
        // 乐观锁冲突可以重试
        if (e.getCause() instanceof OptimisticLockingFailureException) {
            return true;
        }
        
        // 网络异常可以重试
        if (e instanceof IOException || e instanceof AmqpException) {
            return true;
        }
        
        // 获取重试次数
        int retryCount = getRetryCount(message);
        if (retryCount >= 3) {
            log.warn("超过最大重试次数: {}", retryCount);
            return false;
        }
        
        return true;
    }
    
    private int getRetryCount(Message message) {
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        if (headers != null && headers.containsKey("x-death")) {
            List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
            if (deaths != null && !deaths.isEmpty()) {
                return (int) deaths.get(0).get("count");
            }
        }
        return 0;
    }
}

方案3:生产者端幂等性

JAVA
@Component
@Slf4j
public class IdempotentMessageProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    /**
     * 幂等的消息发送
     */
    public boolean sendIdempotent(String exchange, String routingKey, 
                                 FilterResultRecordNotification notification) {
        
        // 生成消息唯一键
        String messageKey = generateMessageKey(notification);
        
        // 检查是否已发送
        if (isMessageSent(messageKey)) {
            log.warn("消息已发送,跳过: orderId={}, serialNo={}", 
                    notification.getOrderId(), notification.getSerialNo());
            return true;
        }
        
        // 发送消息
        boolean sent = sendReliably(exchange, routingKey, notification);
        
        if (sent) {
            // 标记为已发送
            markMessageSent(messageKey);
        }
        
        return sent;
    }
    
    private String generateMessageKey(FilterResultRecordNotification notification) {
        return String.format("sent:order:%d:serial:%d", 
                notification.getOrderId(), notification.getSerialNo());
    }
    
    private boolean isMessageSent(String messageKey) {
        return Boolean.TRUE.equals(redisTemplate.hasKey(messageKey));
    }
    
    private void markMessageSent(String messageKey) {
        // 设置较短的过期时间,比如5分钟
        redisTemplate.opsForValue().set(messageKey, "1", 5, TimeUnit.MINUTES);
    }
}

方案4:完整的可靠消息方案

JAVA
@Configuration
@Slf4j
public class ReliableMessagingConfig {
    
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        
        // 开启发布者确认
        factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        factory.setPublisherReturns(true);
        
        return factory;
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        
        // 启用强制标志(消息不可达时返回)
        template.setMandatory(true);
        
        // 设置消息转换器
        template.setMessageConverter(jsonMessageConverter());
        
        // 配置重试
        RetryTemplate retryTemplate = new RetryTemplate();
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        
        template.setRetryTemplate(retryTemplate);
        
        return template;
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        
        // 手动确认模式
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        
        // 每次只取一条消息
        factory.setPrefetchCount(1);
        
        // 并发消费者
        factory.setConcurrentConsumers(2);
        factory.setMaxConcurrentConsumers(4);
        
        // 消息转换器
        factory.setMessageConverter(jsonMessageConverter());
        
        // 设置异常处理器
        factory.setErrorHandler(errorHandler());
        
        return factory;
    }
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(
            new FatalExceptionStrategy() {
                @Override
                public boolean isFatal(Throwable t) {
                    // 只有致命异常才拒绝消息
                    return t instanceof MessageConversionException 
                        || t instanceof org.springframework.amqp.support.converter.MessageConversionException
                        || t instanceof IllegalArgumentException;
                }
            }
        );
    }
}

诊断工具:检查当前配置

JAVA
@Component
public class RabbitMQConfigChecker {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @EventListener(ApplicationReadyEvent.class)
    public void checkConfig() {
        log.info("=== RabbitMQ配置检查 ===");
        
        ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
        if (connectionFactory instanceof CachingConnectionFactory) {
            CachingConnectionFactory ccf = (CachingConnectionFactory) connectionFactory;
            log.info("发布者确认: {}", ccf.isPublisherConfirmType());
            log.info("发布者返回: {}", ccf.isPublisherReturns());
        }
        
        // 检查队列配置
        checkQueueConfig();
    }
    
    private void checkQueueConfig() {
        try {
            Properties queueProperties = rabbitTemplate.execute(channel -> {
                return channel.getQueueProperties(
                    "NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker"
                );
            });
            
            if (queueProperties != null) {
                log.info("队列消费者数量: {}", queueProperties.get("QUEUE_CONSUMER_COUNT"));
                log.info("队列消息数量: {}", queueProperties.get("QUEUE_MESSAGE_COUNT"));
            }
        } catch (Exception e) {
            log.error("检查队列配置失败", e);
        }
    }
}

实施优先级

第一阶段:立即实施(今天)

  1. 将消费者改为手动确认模式(最重要)
  2. 添加生产者确认机制
  3. 启用消息持久化

第二阶段:短期优化(1-2天)

  1. 实现生产者端幂等性
  2. 配置死信队列和重试策略
  3. 添加消息轨迹追踪

第三阶段:长期加固(1周内)

  1. 实现消息审计和监控
  2. 添加告警机制
  3. 压力测试和性能优化

根本原因总结

真正的根本原因是:消息确认机制不完善

  1. 生产者:发送后不确认是否成功,可能重复发送
  2. RabbitMQ:自动确认模式下,消息一旦推送就认为成功
  3. 消费者:处理失败时消息已丢失,无法重试

解决方案的核心

  • 生产者:发送确认 + 幂等性
  • RabbitMQ:手动确认 + 持久化
  • 消费者:幂等处理 + 错误重试

这样就能从根上解决消息重复和丢失的问题。

全部评论: 0

    我有话说: