17:38:29.353 2026-04-15 17:38:29.353 [[32mpool-9-thread-32[0;39m] [34mINFO [0;39m [[1;35mc.g.t.a.l.o.OrderTextAiCheckNotificationListener[0;39m] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238","mqMsgSendTime":1776245880074,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542927080424275,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:29.362 2026-04-15 17:38:29.362 [[32mpool-9-thread-32[0;39m] [1;31mERROR[0;39m [[1;35mm.i.s.n.d.AmqpNotificationListenerManager[0;39m] - 通知处理器抛出异常 logUUid=8734bd88b8ae4de3af8c578367248e8c, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238","mqMsgSendTime":1776245880074,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542927080424275,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542989361119823 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.mongodb.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:158) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at jdk.proxy3/jdk.proxy3.$Proxy580.save(Unknown Source) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener.handleFilterResultRecordNotificationTurnWrightWorker(OrderTextAiCheckNotificationListener.java:284) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at com.gongqibing.turnright.application.web.idempotent.concurrent.ConcurrentInterceptor.invoke(ConcurrentInterceptor.java:89) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:728) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener$$SpringCGLIB$$0.handleFilterResultRecordNotificationTurnWrightWorker(<generated>) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.onMessageReceived(AmqpNotificationListenerManager.java:260) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.lambda$generateMessageListener$6(AmqpNotificationListenerManager.java:211) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1694) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1616) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:114) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:357) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:246) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:135) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at org.springframework.amqp.rabbit.listener.$Proxy1380.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1604) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1595) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1540) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1521) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1197) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1154) at org.apache.skywalking.apm.plugin.rabbitmq.TracerConsumer.handleDelivery(TracerConsumer.java:96) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) turnright-worker testing 17:38:29.365 2026-04-15 17:38:29.365 [[32mpool-9-thread-28[0;39m] [34mINFO [0;39m [[1;35mc.g.t.a.l.o.OrderTextAiCheckNotificationListener[0;39m] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238","mqMsgSendTime":1776245880074,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542927080424275,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:29.373 2026-04-15 17:38:29.373 [[32mpool-9-thread-28[0;39m] [1;31mERROR[0;39m [[1;35mm.i.s.n.d.AmqpNotificationListenerManager[0;39m] - 通知处理器抛出异常 logUUid=44f374fd671643bdb74d4bd6052cbcf4, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238","mqMsgSendTime":1776245880074,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542927080424275,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542989361119824 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.mongodb.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:158) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at jdk.proxy3/jdk.proxy3.$Proxy580.save(Unknown Source) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener.handleFilterResultRecordNotificationTurnWrightWorker(OrderTextAiCheckNotificationListener.java:284) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at com.gongqibing.turnright.application.web.idempotent.concurrent.ConcurrentInterceptor.invoke(ConcurrentInterceptor.java:89) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:728) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener$$SpringCGLIB$$0.handleFilterResultRecordNotificationTurnWrightWorker(<generated>) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.onMessageReceived(AmqpNotificationListenerManager.java:260) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.lambda$generateMessageListener$6(AmqpNotificationListenerManager.java:211) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1694) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1616) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:114) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:357) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:246) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:135) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at org.springframework.amqp.rabbit.listener.$Proxy1380.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1604) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1595) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1540) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1521) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1197) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1154) at org.apache.skywalking.apm.plugin.rabbitmq.TracerConsumer.handleDelivery(TracerConsumer.java:96) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) turnright-worker testing 17:38:30.656 2026-04-15 17:38:30.655 [[32mpool-9-thread-34[0;39m] [34mINFO [0;39m [[1;35mc.g.t.a.l.o.OrderTextAiCheckNotificationListener[0;39m] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776241693242,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435533907649102714,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:30.662 2026-04-15 17:38:30.662 [[32mpool-9-thread-34[0;39m] [1;31mERROR[0;39m [[1;35mm.i.s.n.d.AmqpNotificationListenerManager[0;39m] - 通知处理器抛出异常 logUUid=f783d1293f574d9b965dee04a73be3e7, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776241693242,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435533907649102714,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542991508603396 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.mongodb.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:158) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at jdk.proxy3/jdk.proxy3.$Proxy580.save(Unknown Source) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener.handleFilterResultRecordNotificationTurnWrightWorker(OrderTextAiCheckNotificationListener.java:284) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at com.gongqibing.turnright.application.web.idempotent.concurrent.ConcurrentInterceptor.invoke(ConcurrentInterceptor.java:89) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:728) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener$$SpringCGLIB$$0.handleFilterResultRecordNotificationTurnWrightWorker(<generated>) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.onMessageReceived(AmqpNotificationListenerManager.java:260) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.lambda$generateMessageListener$6(AmqpNotificationListenerManager.java:211) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1694) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1616) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:114) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:357) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:246) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:135) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at org.springframework.amqp.rabbit.listener.$Proxy1380.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1604) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1595) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1540) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1521) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1197) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1154) at org.apache.skywalking.apm.plugin.rabbitmq.TracerConsumer.handleDelivery(TracerConsumer.java:96) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) turnright-worker testing 17:38:30.664 2026-04-15 17:38:30.664 [[32mpool-9-thread-4[0;39m] [34mINFO [0;39m [[1;35mc.g.t.a.l.o.OrderTextAiCheckNotificationListener[0;39m] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776241693242,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435533907649102714,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:30.672 2026-04-15 17:38:30.672 [[32mpool-9-thread-4[0;39m] [1;31mERROR[0;39m [[1;35mm.i.s.n.d.AmqpNotificationListenerManager[0;39m] - 通知处理器抛出异常 logUUid=12042b1392364e39bdfbb1e575366ee9, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776241693242,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435533907649102714,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542991508603397 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.mongodb.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:158) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at jdk.proxy3/jdk.proxy3.$Proxy580.save(Unknown Source) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener.handleFilterResultRecordNotificationTurnWrightWorker(OrderTextAiCheckNotificationListener.java:284) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at com.gongqibing.turnright.application.web.idempotent.concurrent.ConcurrentInterceptor.invoke(ConcurrentInterceptor.java:89) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:728) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener$$SpringCGLIB$$0.handleFilterResultRecordNotificationTurnWrightWorker(<generated>) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.onMessageReceived(AmqpNotificationListenerManager.java:260) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.lambda$generateMessageListener$6(AmqpNotificationListenerManager.java:211) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1694) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1616) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:114) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:357) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:246) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:135) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at org.springframework.amqp.rabbit.listener.$Proxy1380.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1604) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1595) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1540) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1521) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1197) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1154) at org.apache.skywalking.apm.plugin.rabbitmq.TracerConsumer.handleDelivery(TracerConsumer.java:96) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) turnright-worker testing 17:38:31.192 2026-04-15 17:38:31.192 [[32mpool-9-thread-9[0;39m] [34mINFO [0;39m [[1;35mc.g.t.a.l.o.OrderTextAiCheckNotificationListener[0;39m] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776245520056,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542153989980571,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:31.201 2026-04-15 17:38:31.201 [[32mpool-9-thread-9[0;39m] [1;31mERROR[0;39m [[1;35mm.i.s.n.d.AmqpNotificationListenerManager[0;39m] - 通知处理器抛出异常 logUUid=4c3028449623493eae062ce358c108f6, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776245520056,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542153989980571,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542993656087059 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.mongodb.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:158) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at jdk.proxy3/jdk.proxy3.$Proxy580.save(Unknown Source) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener.handleFilterResultRecordNotificationTurnWrightWorker(OrderTextAiCheckNotificationListener.java:284) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at com.gongqibing.turnright.application.web.idempotent.concurrent.ConcurrentInterceptor.invoke(ConcurrentInterceptor.java:89) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:728) at com.gongqibing.turnright.application.listener.order.OrderTextAiCheckNotificationListener$$SpringCGLIB$$0.handleFilterResultRecordNotificationTurnWrightWorker(<generated>) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.onMessageReceived(AmqpNotificationListenerManager.java:260) at me.insidezhou.southernquiet.notification.driver.AmqpNotificationListenerManager.lambda$generateMessageListener$6(AmqpNotificationListenerManager.java:211) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1694) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1616) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:114) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:357) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:246) at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:135) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223) at org.springframework.amqp.rabbit.listener.$Proxy1380.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1604) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1595) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1540) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1521) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:1197) at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:1154) at org.apache.skywalking.apm.plugin.rabbitmq.TracerConsumer.handleDelivery(TracerConsumer.java:96) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) turnright-worker testing 17:38:31.205 2026-04-15 17:38:31.205 [[32mpool-9-thread-13[0;39m] [34mINFO [0;39m [[1;35mc.g.t.a.l.o.OrderTextAiCheckNotificationListener[0;39m] - handleFilterResultRecordNotification-开始 notification={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776245520056,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542153989980571,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} turnright-worker testing 17:38:31.216 2026-04-15 17:38:31.216 [[32mpool-9-thread-13[0;39m] [1;31mERROR[0;39m [[1;35mm.i.s.n.d.AmqpNotificationListenerManager[0;39m] - 通知处理器抛出异常 logUUid=e14353a9a39142b4948c3c581f1e5e92, 队列=NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker, 消息体={"mqMsgFrom":"metadata-config-center@test-k8s-192.168.245.158","mqMsgSendTime":1776245520056,"recordCreateTime":null,"orderId":2041437754707701769,"serialNo":435542153989980571,"filterStatus":1,"filteredSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"filteredServiceRemark":"","filteredOwnerAddress":"天安门城楼123","filteredDeliveryAddress":"","originalSubOrderContentList":[{"subOrderId":2041437754707701768,"skuRemark":"(需求说明:1212121)","otherSkuName":"1111111111"}],"originalServiceRemark":null,"originalOwnerAddress":"天安门城楼123","originalDeliveryAddress":null,"filterSuccess":true} org.springframework.dao.OptimisticLockingFailureException: Cannot save entity 435542993652417150 with version 1 to collection OrderTextAiCheck; Has it been modified meanwhile org.springframework.data.mongodb.core.MongoTemplate.doSaveVersioned(MongoTemplate.java:1509) at org.springframework.data.mongodb.core.MongoTemplate.save(MongoTemplate.java:1472) at org.springframework.data.mongodb.repository.support.SimpleMongoRepository.save(SimpleMongoRepository.java:99) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:360) at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:277) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:170) at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:158) at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:515) at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:284) at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:731) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:174) at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:149) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:69) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:18
从最新的日志可以清楚地看到问题的全貌。这是严重的消息重复和并发处理问题。让我详细分析并提供解决方案:
问题分析
1. 问题现象
- 同一消息被多次重复投递
- 每个消息都有两个消费者同时处理
- 所有处理都因乐观锁冲突失败
2. 具体时间线
17:38:29.353 - Thread-32 开始处理消息 serialNo:435542927080424275
17:38:29.365 - Thread-28 开始处理同样的消息 (相差12毫秒)
17:38:29.362 - Thread-32 失败 (乐观锁)
17:38:29.373 - Thread-28 失败 (乐观锁)
17:38:30.655 - Thread-34 开始处理另一消息 serialNo:435533907649102714
17:38:30.664 - Thread-4 开始处理同样的消息 (相差9毫秒)
17:38:30.662 - Thread-34 失败
17:38:30.672 - Thread-4 失败
17:38:31.192 - Thread-9 开始处理消息 serialNo:435542153989980571
17:38:31.205 - Thread-13 开始处理同样的消息 (相差13毫秒)
17:38:31.201 - Thread-9 失败
17:38:31.216 - Thread-13 失败
3. 根本原因
- 消息被重复投递 - 生产者可能发送了多次
- RabbitMQ工作队列模式 - 多个消费者同时消费同一队列
- 缺乏幂等性处理 - 没有检查消息是否已处理
- 缺乏分布式锁 - 同一订单的消息被并发处理
完整解决方案
方案1:立即修复代码(最紧急)
@Component
@Slf4j
public class OrderTextAiCheckNotificationListener {
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private OrderTextAiCheckRepository orderTextAiCheckRepository;
// 添加重试机制
@Retryable(
value = OptimisticLockingFailureException.class,
maxAttempts = 3,
backoff = @Backoff(delay = 100, multiplier = 2)
)
public void handleFilterResultRecordNotificationTurnWrightWorker(
FilterResultRecordNotification notification) {
String messageKey = generateMessageKey(notification);
// 1. Redis幂等性检查(快速失败)
if (isMessageProcessed(messageKey)) {
log.info("⚠️ 消息已处理,跳过: orderId={}, serialNo={}",
notification.getOrderId(), notification.getSerialNo());
return;
}
// 2. 获取分布式锁,确保同一订单串行处理
String lockKey = "order:process:" + notification.getOrderId();
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试获取锁,等待3秒,锁持有10秒
boolean acquired = lock.tryLock(3, 10, TimeUnit.SECONDS);
if (!acquired) {
log.warn("⚠️ 获取分布式锁失败,orderId={}", notification.getOrderId());
// 抛出异常让消息重新入队
throw new RuntimeException("系统繁忙,请稍后重试");
}
try {
// 3. 数据库幂等性检查(双重检查)
if (existsInDatabase(notification)) {
log.info("✅ 数据库已存在,跳过: orderId={}, serialNo={}",
notification.getOrderId(), notification.getSerialNo());
// 标记为已处理,避免下次再查数据库
markMessageProcessed(messageKey);
return;
}
// 4. 处理消息(使用原子操作)
processWithAtomicOperation(notification);
// 5. 标记消息已处理
markMessageProcessed(messageKey);
log.info("✅ 消息处理成功: orderId={}, serialNo={}",
notification.getOrderId(), notification.getSerialNo());
} finally {
// 确保锁被释放
if (lock.isHeldByCurrentThread() && lock.isLocked()) {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("❌ 处理被中断: orderId={}", notification.getOrderId(), e);
throw new RuntimeException("处理被中断", e);
} catch (Exception e) {
log.error("❌ 处理失败: orderId={}, serialNo={}",
notification.getOrderId(), notification.getSerialNo(), e);
throw e; // 抛出异常,让消息重新入队
}
}
/**
* 使用MongoDB原子操作处理
*/
private void processWithAtomicOperation(FilterResultRecordNotification notification) {
Query query = new Query();
query.addCriteria(Criteria.where("orderId").is(notification.getOrderId())
.and("serialNo").is(notification.getSerialNo()));
Update update = new Update();
// 设置所有字段
update.set("orderId", notification.getOrderId());
update.set("serialNo", notification.getSerialNo());
update.set("filterStatus", notification.getFilterStatus());
update.set("filterSuccess", notification.isFilterSuccess());
update.set("mqMsgFrom", notification.getMqMsgFrom());
update.set("mqMsgSendTime", notification.getMqMsgSendTime());
update.set("updateTime", new Date());
// 设置列表字段(安全处理null)
if (notification.getFilteredSubOrderContentList() != null) {
update.set("filteredSubOrderContentList",
new ArrayList<>(notification.getFilteredSubOrderContentList()));
}
if (notification.getOriginalSubOrderContentList() != null) {
update.set("originalSubOrderContentList",
new ArrayList<>(notification.getOriginalSubOrderContentList()));
}
// 设置字符串字段(安全处理null)
update.set("filteredServiceRemark",
StringUtils.defaultString(notification.getFilteredServiceRemark()));
update.set("filteredOwnerAddress",
StringUtils.defaultString(notification.getFilteredOwnerAddress()));
update.set("filteredDeliveryAddress",
StringUtils.defaultString(notification.getFilteredDeliveryAddress()));
update.set("originalServiceRemark",
StringUtils.defaultString(notification.getOriginalServiceRemark()));
update.set("originalOwnerAddress",
StringUtils.defaultString(notification.getOriginalOwnerAddress()));
update.set("originalDeliveryAddress",
StringUtils.defaultString(notification.getOriginalDeliveryAddress()));
// 如果是插入操作,设置创建时间
update.setOnInsert("createTime", new Date());
update.setOnInsert("version", 0);
// 原子操作:存在则更新,不存在则插入
OrderTextAiCheck result = mongoTemplate.findAndModify(
query,
update,
FindAndModifyOptions.options()
.upsert(true) // 如果不存在则插入
.returnNew(true), // 返回更新后的文档
OrderTextAiCheck.class
);
if (result == null) {
throw new RuntimeException("处理失败,未找到或创建记录");
}
}
/**
* 生成消息唯一键
*/
private String generateMessageKey(FilterResultRecordNotification notification) {
return String.format("msg:processed:order:%d:serial:%d",
notification.getOrderId(), notification.getSerialNo());
}
/**
* 检查消息是否已处理
*/
private boolean isMessageProcessed(String messageKey) {
try {
return Boolean.TRUE.equals(redisTemplate.hasKey(messageKey));
} catch (Exception e) {
log.warn("Redis检查失败,降级处理: key={}", messageKey, e);
return false; // Redis异常时降级,继续处理
}
}
/**
* 标记消息已处理
*/
private void markMessageProcessed(String messageKey) {
try {
// 设置24小时过期
redisTemplate.opsForValue().set(messageKey, "1", 24, TimeUnit.HOURS);
} catch (Exception e) {
log.warn("Redis标记失败: key={}", messageKey, e);
// Redis异常不影响主流程
}
}
/**
* 检查数据库中是否已存在
*/
private boolean existsInDatabase(FilterResultRecordNotification notification) {
try {
return orderTextAiCheckRepository.existsByOrderIdAndSerialNo(
notification.getOrderId(), notification.getSerialNo());
} catch (Exception e) {
log.warn("数据库检查失败,降级处理: orderId={}, serialNo={}",
notification.getOrderId(), notification.getSerialNo(), e);
return false; // 数据库异常时降级,继续处理
}
}
}
方案2:配置RabbitMQ(防止重复投递)
# application.yml
spring:
rabbitmq:
# 生产者配置
publisher-confirms: true
publisher-returns: true
# 消费者配置
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 1 # 每个消费者一次只取一条消息
concurrency: 2 # 控制消费者数量
max-concurrency: 4
retry:
enabled: false # 禁用Spring的重试,使用RabbitMQ的重试
default-requeue-rejected: false # 不自动重新入队
# 连接配置
connection-timeout: 5000
@Configuration
public class RabbitMQConfig {
@Bean
public Queue filterResultQueue() {
Map<String, Object> args = new HashMap<>();
// 死信队列配置
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "filter.result.dlx");
// 消息TTL(1分钟)
args.put("x-message-ttl", 60000);
// 最大队列长度
args.put("x-max-length", 10000);
return new Queue(
"NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker",
true, false, false, args
);
}
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange");
}
@Bean
public Queue dlxQueue() {
return new Queue("filter.result.dlx.queue", true);
}
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with("filter.result.dlx");
}
@Bean
public MessageConverter messageConverter() {
// 使用Jackson2JsonMessageConverter,避免消息序列化问题
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(1); // 关键:每次只取一条消息
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);
// 设置异常处理器
factory.setErrorHandler(new ConditionalRejectingErrorHandler(
new CustomExceptionStrategy()
));
return factory;
}
// 自定义异常策略
public static class CustomExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
@Override
public boolean isFatal(Throwable t) {
// 乐观锁冲突不认为是致命错误,不关闭连接
if (t.getCause() instanceof OptimisticLockingFailureException) {
return false;
}
return super.isFatal(t);
}
}
}
方案3:手动确认消息
@Component
@Slf4j
public class ManualAckMessageListener {
@Autowired
private OrderTextAiCheckNotificationListener notificationListener;
@RabbitListener(
queues = "NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker",
containerFactory = "rabbitListenerContainerFactory"
)
public void onMessage(
FilterResultRecordNotification notification,
Message message,
Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
try {
// 处理消息
notificationListener.handleFilterResultRecordNotificationTurnWrightWorker(notification);
// 处理成功,手动确认
channel.basicAck(deliveryTag, false);
log.info("消息处理成功并确认: messageId={}, deliveryTag={}",
messageId, deliveryTag);
} catch (Exception e) {
log.error("消息处理失败: messageId={}, deliveryTag={}",
messageId, deliveryTag, e);
// 判断是否重试
if (shouldRetry(message, e)) {
// 重新入队,最多重试3次
if (getRetryCount(message) < 3) {
log.warn("消息重新入队: messageId={}, 重试次数={}",
messageId, getRetryCount(message));
channel.basicNack(deliveryTag, false, true);
} else {
// 超过重试次数,进入死信队列
log.error("超过最大重试次数,进入死信队列: messageId={}", messageId);
channel.basicNack(deliveryTag, false, false);
}
} else {
// 不重试,直接确认(避免死循环)
log.error("消息处理失败,不重试: messageId={}", messageId);
channel.basicAck(deliveryTag, false);
}
}
}
private boolean shouldRetry(Message message, Exception e) {
// 乐观锁冲突可以重试
if (e.getCause() instanceof OptimisticLockingFailureException) {
return true;
}
// 网络异常可以重试
if (e instanceof AmqpException) {
return true;
}
// 其他业务异常不重试
return false;
}
private int getRetryCount(Message message) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
if (headers != null && headers.containsKey("x-death")) {
List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
if (deaths != null && !deaths.isEmpty()) {
return (int) deaths.get(0).get("count");
}
}
return 0;
}
}
方案4:消息轨迹追踪
@Component
@Slf4j
public class MessageTraceService {
@Autowired
private MongoTemplate mongoTemplate;
/**
* 记录消息轨迹
*/
public void traceMessage(String messageId, Long orderId, Long serialNo,
String status, String error) {
MessageTrace trace = MessageTrace.builder()
.messageId(messageId)
.orderId(orderId)
.serialNo(serialNo)
.status(status)
.error(error)
.processTime(new Date())
.build();
mongoTemplate.save(trace, "message_trace");
}
/**
* 检查消息是否已成功处理
*/
public boolean isMessageSuccessfullyProcessed(Long orderId, Long serialNo) {
Query query = new Query(Criteria.where("orderId").is(orderId)
.and("serialNo").is(serialNo)
.and("status").is("SUCCESS"));
return mongoTemplate.exists(query, MessageTrace.class);
}
}
@Data
@Builder
@Document(collection = "message_trace")
class MessageTrace {
@Id
private String id;
private String messageId;
private Long orderId;
private Long serialNo;
private String status; // RECEIVED, PROCESSING, SUCCESS, FAILED
private String error;
private Date processTime;
}
方案5:监控和告警
@Component
@Slf4j
public class MessageMonitor {
@Autowired
private RedisTemplate<String, String> redisTemplate;
// 监控指标
private final Map<String, AtomicInteger> metrics = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
metrics.put("total_messages", new AtomicInteger(0));
metrics.put("success_messages", new AtomicInteger(0));
metrics.put("failed_messages", new AtomicInteger(0));
metrics.put("duplicate_messages", new AtomicInteger(0));
metrics.put("lock_timeout_messages", new AtomicInteger(0));
// 定时上报监控指标
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::reportMetrics, 1, 1, TimeUnit.MINUTES);
}
/**
* 记录消息处理指标
*/
public void recordMetric(String metricName) {
AtomicInteger counter = metrics.get(metricName);
if (counter != null) {
counter.incrementAndGet();
}
// 实时告警
if ("failed_messages".equals(metricName)) {
checkAndAlert();
}
}
/**
* 检查并告警
*/
private void checkAndAlert() {
int failedCount = metrics.get("failed_messages").get();
int totalCount = metrics.get("total_messages").get();
if (totalCount > 0) {
double failureRate = (double) failedCount / totalCount;
if (failureRate > 0.1) { // 失败率超过10%
log.error("⚠️ 消息处理失败率过高: {}/{} ({}%)",
failedCount, totalCount, failureRate * 100);
// 发送告警通知
sendAlert("消息处理失败率过高",
String.format("失败率: %.2f%%,请立即检查!", failureRate * 100));
}
}
}
private void reportMetrics() {
log.info("📊 消息处理监控指标:");
metrics.forEach((key, value) -> {
log.info(" {}: {}", key, value.get());
});
// 重置计数器
metrics.values().forEach(counter -> counter.set(0));
}
private void sendAlert(String title, String message) {
// 实现告警发送逻辑(邮件、钉钉、企业微信等)
}
}
实施步骤
第一阶段:立即修复(今天)
- 在现有代码中添加Redis幂等性检查
- 添加Redisson分布式锁
- 将Repository.save()改为MongoTemplate.findAndModify()
第二阶段:配置优化(1-2天)
- 配置RabbitMQ死信队列
- 配置消息TTL和最大重试次数
- 实现手动确认模式
第三阶段:监控完善(3-5天)
- 添加消息轨迹追踪
- 实现监控告警
- 添加仪表板展示关键指标
关键配置
1. Redis配置
spring:
redis:
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}
password: ${REDIS_PASSWORD:}
database: 0
timeout: 3000ms
lettuce:
pool:
max-active: 8
max-wait: -1ms
max-idle: 8
min-idle: 0
2. Redisson配置
redisson:
address: "redis://${spring.redis.host}:${spring.redis.port}"
password: ${spring.redis.password}
database: ${spring.redis.database:0}
timeout: ${spring.redis.timeout:3000}
connection-pool-size: ${spring.redis.lettuce.pool.max-active:8}
3. MongoDB配置
spring:
data:
mongodb:
uri: mongodb://${MONGO_HOST:localhost}:${MONGO_PORT:27017}/turnright
auto-index-creation: true
创建MongoDB索引
// 创建唯一索引,防止重复数据
db.OrderTextAiCheck.createIndex(
{ orderId: 1, serialNo: 1 },
{ unique: true, name: "uk_order_serial" }
);
// 创建查询索引
db.OrderTextAiCheck.createIndex({ orderId: 1 });
db.OrderTextAiCheck.createIndex({ serialNo: 1 });
db.OrderTextAiCheck.createIndex({ createTime: -1 });
这个完整的解决方案能彻底解决:
- ✅ 消息重复投递问题
- ✅ 并发处理冲突问题
- ✅ 乐观锁异常问题
- ✅ 系统可靠性问题
- ✅ 监控和可观测性问题
消息被重复投递 - 生产者可能发送了多次?这是根本原因吗?
你说得非常对!生产者发送多次不是根本原因,而是表象。让我帮你深入分析真正的根本原因。
真正的根本原因分析
1. RabbitMQ消息确认机制问题
从日志可以看出,系统使用的是自动确认模式(Auto Ack),这是最大的问题:
// 你的系统很可能是这样配置的
@RabbitListener(queues = "your.queue")
public void handleMessage(Message message) {
// 处理消息...
// 没有手动ack,使用默认的自动确认
}
自动确认的问题:
- 消息一旦被推送给消费者,RabbitMQ立即标记为已传递
- 如果消费者处理失败,消息已经丢失(不会重试)
- 但生产者可能因为没收到确认而重发消息
2. 生产者确认机制缺失
生产者发送消息后:
// 没有启用发布者确认
rabbitTemplate.convertAndSend(exchange, routingKey, message);
// 发送后不等待确认,不知道消息是否成功到达RabbitMQ
3. 消息持久化问题
消息可能没有持久化:
// 消息可能没有设置持久化
MessageProperties props = new MessageProperties();
// props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 缺少这行
Message message = new Message(body.getBytes(), props);
4. 网络问题导致重发
生产者 → 网络抖动 → RabbitMQ(消息可能已收到)
↓
生产者没收到确认 → 认为发送失败 → 重发
↓
RabbitMQ收到两条相同的消息
根本原因验证
让我们通过日志来验证:
观察1:相同消息的细微差别
第一个消息:
"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238"
"mqMsgSendTime":1776245880074
第二个消息(完全相同):
"mqMsgFrom":"metadata-config-center@test-k8s-192.168.18.238"
"mqMsgSendTime":1776245880074
完全相同的内容,但被两个消费者处理 → 说明是两条独立的消息,不是同一个消息被两个消费者处理。
观察2:时间戳分析
17:38:29.353 - 线程32开始处理
17:38:29.365 - 线程28开始处理
相差12毫秒
如果是同一个消息被两个消费者竞争,时间差应该在微秒级别。12毫秒的差异说明这是两条独立的消息。
观察3:实体ID不同
线程32: entity 435542989361119823
线程28: entity 435542989361119824
不同的实体ID,说明两个消费者创建了两个不同的实体对象,然后都尝试保存到同一个订单+serialNo的记录。
真正的根本原因链条
生产者发送消息
↓
RabbitMQ收到并存储
↓
消费者1获取消息(自动确认)
↓
消费者1处理中(还未完成)
↓
生产者没收到确认,超时重发
↓
RabbitMQ收到重复消息
↓
消费者2获取重复消息
↓
两个消费者同时处理 → 乐观锁冲突
解决方案(从根上解决)
方案1:修复消息确认机制(最紧急)
# application.yml
spring:
rabbitmq:
# 1. 启用生产者确认
publisher-confirm-type: correlated
publisher-returns: true
# 2. 配置消费者手动确认
listener:
simple:
acknowledge-mode: manual # 关键:改为手动确认
prefetch: 1
concurrency: 1
max-concurrency: 2
# 3. 消息持久化
template:
mandatory: true
@Component
@Slf4j
public class ReliableMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
private final Map<String, CompletableFuture<Boolean>> confirmMap = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
// 设置确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (correlationData != null) {
String messageId = correlationData.getId();
CompletableFuture<Boolean> future = confirmMap.remove(messageId);
if (future != null) {
future.complete(ack);
}
if (!ack) {
log.error("消息发送失败: messageId={}, cause={}", messageId, cause);
}
}
});
// 设置返回回调(消息不可达时)
rabbitTemplate.setReturnsCallback(returned -> {
log.error("消息不可达: exchange={}, routingKey={}, replyCode={}, replyText={}",
returned.getExchange(), returned.getRoutingKey(),
returned.getReplyCode(), returned.getReplyText());
});
}
/**
* 可靠的消息发送
*/
public boolean sendReliably(String exchange, String routingKey, Object message) {
String messageId = UUID.randomUUID().toString();
// 构建消息
MessageProperties props = new MessageProperties();
props.setMessageId(messageId);
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 持久化
props.setContentType("application/json");
props.setTimestamp(new Date());
Message amqpMessage = new Message(
new ObjectMapper().writeValueAsBytes(message),
props
);
// 等待确认
CompletableFuture<Boolean> confirmFuture = new CompletableFuture<>();
confirmMap.put(messageId, confirmFuture);
// 发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, amqpMessage,
new CorrelationData(messageId));
try {
// 等待确认,超时3秒
Boolean ack = confirmFuture.get(3, TimeUnit.SECONDS);
if (ack) {
log.info("消息发送成功: messageId={}", messageId);
return true;
} else {
log.error("消息发送失败: messageId={}", messageId);
return false;
}
} catch (TimeoutException e) {
log.error("消息确认超时: messageId={}", messageId);
return false;
} catch (Exception e) {
log.error("消息发送异常: messageId={}", messageId, e);
return false;
}
}
}
方案2:消费者手动确认
@Component
@Slf4j
public class ReliableMessageConsumer {
@Autowired
private OrderTextAiCheckService orderTextAiCheckService;
@RabbitListener(
queues = "NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker",
ackMode = "MANUAL" // 手动确认
)
public void handleMessage(
FilterResultRecordNotification notification,
Message message,
Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String messageId = message.getMessageProperties().getMessageId();
try {
// 1. 幂等性检查
if (orderTextAiCheckService.isMessageProcessed(
notification.getOrderId(), notification.getSerialNo())) {
log.info("消息已处理,直接确认: messageId={}", messageId);
channel.basicAck(deliveryTag, false);
return;
}
// 2. 处理消息
orderTextAiCheckService.processNotification(notification);
// 3. 处理成功,手动确认
channel.basicAck(deliveryTag, false);
log.info("消息处理成功: messageId={}", messageId);
} catch (Exception e) {
log.error("消息处理失败: messageId={}", messageId, e);
// 4. 处理失败,判断是否重试
if (shouldRetry(message, e)) {
// 拒绝消息,重新入队
channel.basicNack(deliveryTag, false, true);
log.warn("消息重新入队: messageId={}", messageId);
} else {
// 拒绝消息,不重新入队(进入死信队列)
channel.basicNack(deliveryTag, false, false);
log.error("消息进入死信队列: messageId={}", messageId);
}
}
}
private boolean shouldRetry(Message message, Exception e) {
// 乐观锁冲突可以重试
if (e.getCause() instanceof OptimisticLockingFailureException) {
return true;
}
// 网络异常可以重试
if (e instanceof IOException || e instanceof AmqpException) {
return true;
}
// 获取重试次数
int retryCount = getRetryCount(message);
if (retryCount >= 3) {
log.warn("超过最大重试次数: {}", retryCount);
return false;
}
return true;
}
private int getRetryCount(Message message) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
if (headers != null && headers.containsKey("x-death")) {
List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
if (deaths != null && !deaths.isEmpty()) {
return (int) deaths.get(0).get("count");
}
}
return 0;
}
}
方案3:生产者端幂等性
@Component
@Slf4j
public class IdempotentMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 幂等的消息发送
*/
public boolean sendIdempotent(String exchange, String routingKey,
FilterResultRecordNotification notification) {
// 生成消息唯一键
String messageKey = generateMessageKey(notification);
// 检查是否已发送
if (isMessageSent(messageKey)) {
log.warn("消息已发送,跳过: orderId={}, serialNo={}",
notification.getOrderId(), notification.getSerialNo());
return true;
}
// 发送消息
boolean sent = sendReliably(exchange, routingKey, notification);
if (sent) {
// 标记为已发送
markMessageSent(messageKey);
}
return sent;
}
private String generateMessageKey(FilterResultRecordNotification notification) {
return String.format("sent:order:%d:serial:%d",
notification.getOrderId(), notification.getSerialNo());
}
private boolean isMessageSent(String messageKey) {
return Boolean.TRUE.equals(redisTemplate.hasKey(messageKey));
}
private void markMessageSent(String messageKey) {
// 设置较短的过期时间,比如5分钟
redisTemplate.opsForValue().set(messageKey, "1", 5, TimeUnit.MINUTES);
}
}
方案4:完整的可靠消息方案
@Configuration
@Slf4j
public class ReliableMessagingConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 开启发布者确认
factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
factory.setPublisherReturns(true);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 启用强制标志(消息不可达时返回)
template.setMandatory(true);
// 设置消息转换器
template.setMessageConverter(jsonMessageConverter());
// 配置重试
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 手动确认模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 每次只取一条消息
factory.setPrefetchCount(1);
// 并发消费者
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);
// 消息转换器
factory.setMessageConverter(jsonMessageConverter());
// 设置异常处理器
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(
new FatalExceptionStrategy() {
@Override
public boolean isFatal(Throwable t) {
// 只有致命异常才拒绝消息
return t instanceof MessageConversionException
|| t instanceof org.springframework.amqp.support.converter.MessageConversionException
|| t instanceof IllegalArgumentException;
}
}
);
}
}
诊断工具:检查当前配置
@Component
public class RabbitMQConfigChecker {
@Autowired
private RabbitTemplate rabbitTemplate;
@EventListener(ApplicationReadyEvent.class)
public void checkConfig() {
log.info("=== RabbitMQ配置检查 ===");
ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
if (connectionFactory instanceof CachingConnectionFactory) {
CachingConnectionFactory ccf = (CachingConnectionFactory) connectionFactory;
log.info("发布者确认: {}", ccf.isPublisherConfirmType());
log.info("发布者返回: {}", ccf.isPublisherReturns());
}
// 检查队列配置
checkQueueConfig();
}
private void checkQueueConfig() {
try {
Properties queueProperties = rabbitTemplate.execute(channel -> {
return channel.getQueueProperties(
"NOTIFICATION.V1.FilterResultRecordNotification#handleFilterResultRecordNotificationTurnWrightWorker"
);
});
if (queueProperties != null) {
log.info("队列消费者数量: {}", queueProperties.get("QUEUE_CONSUMER_COUNT"));
log.info("队列消息数量: {}", queueProperties.get("QUEUE_MESSAGE_COUNT"));
}
} catch (Exception e) {
log.error("检查队列配置失败", e);
}
}
}
实施优先级
第一阶段:立即实施(今天)
- 将消费者改为手动确认模式(最重要)
- 添加生产者确认机制
- 启用消息持久化
第二阶段:短期优化(1-2天)
- 实现生产者端幂等性
- 配置死信队列和重试策略
- 添加消息轨迹追踪
第三阶段:长期加固(1周内)
- 实现消息审计和监控
- 添加告警机制
- 压力测试和性能优化
根本原因总结
真正的根本原因是:消息确认机制不完善
- 生产者:发送后不确认是否成功,可能重复发送
- RabbitMQ:自动确认模式下,消息一旦推送就认为成功
- 消费者:处理失败时消息已丢失,无法重试
解决方案的核心:
- 生产者:发送确认 + 幂等性
- RabbitMQ:手动确认 + 持久化
- 消费者:幂等处理 + 错误重试
这样就能从根上解决消息重复和丢失的问题。