Selaa lähdekoodia

fix: 修复risk更新推送消息配置错误

hidewnd 4 päivää sitten
vanhempi
commit
3dc5f2bf1b

+ 2 - 44
yt-agent/agent-service/src/main/java/com/ytpm/config/mq/RabbitMqConfig.java

@@ -1,16 +1,10 @@
 package com.ytpm.config.mq;
 
 
-import org.springframework.amqp.core.Binding;
-import org.springframework.amqp.core.BindingBuilder;
 import org.springframework.amqp.core.DirectExchange;
-import org.springframework.amqp.core.Queue;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * risk关联对联适配
  *
@@ -20,50 +14,14 @@ import java.util.Map;
 @Configuration
 public class RabbitMqConfig {
 
-    public static final String RISK_BIZ_QUEUE = "risk.save.biz.queue";
-    public static final String RISK_BIZ_EXCHANGE = "risk.save.biz.exchange";
-    public static final String RISK_BIZ_ROUTING_KEY = "risk.save.biz.key";
-
-    public static final String RISK_DLX_QUEUE = "risk.save.dlx.queue";
-    public static final String RISK_DLX_EXCHANGE = "risk.save.dlx.exchange";
-    public static final String RISK_DLX_ROUTING_KEY = "risk.save.dlx.key";
-
-    // ---------------- 声明死信队列 ----------------
+    public static final String RISK_BIZ_EXCHANGE = "risk.biz.exchange";
 
-    @Bean
-    public Queue riskDlxQueue() {
-        return new Queue(RISK_DLX_QUEUE, true);
-    }
-
-    @Bean
-    public DirectExchange riskDlxExchange() {
-        return new DirectExchange(RISK_DLX_EXCHANGE);
-    }
-
-    @Bean
-    public Binding riskDlxBinding() {
-        return BindingBuilder.bind(riskDlxQueue()).to(riskDlxExchange()).with(RISK_DLX_ROUTING_KEY);
-    }
-
-
-    // ---------------- 声明业务队列 ----------------
-    @Bean
-    public Queue bizQueue() {
-        Map<String, Object> args = new HashMap<>();
-        // 动态指向该服务专属的死信交换机
-        args.put("x-dead-letter-exchange", RISK_DLX_EXCHANGE);
-        args.put("x-dead-letter-routing-key", RISK_DLX_ROUTING_KEY);
-        return new Queue(RISK_BIZ_QUEUE, true, false, false, args);
-    }
+    // ---------------- 声明交换机 ----------------
 
     @Bean
     public DirectExchange bizExchange() {
         return new DirectExchange(RISK_BIZ_EXCHANGE);
     }
 
-    @Bean
-    public Binding bizBinding() {
-        return BindingBuilder.bind(bizQueue()).to(bizExchange()).with(RISK_BIZ_ROUTING_KEY);
-    }
 
 }

+ 40 - 5
yt-agent/agent-service/src/main/java/com/ytpm/config/mq/ReliableMessageSender.java

@@ -4,6 +4,12 @@ package com.ytpm.config.mq;
 import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSONArray;
+import com.ytpm.agent.model.YtApp;
+import com.ytpm.agent.model.YtPlatformUserApp;
+import com.ytpm.dao.AppMapper;
+import com.ytpm.dao.RiskMapper;
+import com.ytpm.risk.model.YtRiskTemplate;
+import com.ytpm.service.AgentAppService;
 import com.ytpm.util.RedisService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.rabbit.connection.CorrelationData;
@@ -12,8 +18,11 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 /**
  * @author hidewnd
@@ -27,6 +36,12 @@ public class ReliableMessageSender {
     private RabbitTemplate rabbitTemplate;
     @Resource
     private RedisService redisService;
+    @Resource
+    private AppMapper appMapper;
+    @Resource
+    private RiskMapper riskMapper;
+    @Resource
+    private AgentAppService agentAppService;
 
 
     // 初始化时设置回调
@@ -51,17 +66,37 @@ public class ReliableMessageSender {
     /**
      * 发送消息,带有异常降级处理
      */
-    public void sendRiskChangeToMq(List<String> appIds) {
-        if (CollectionUtil.isEmpty(appIds)) {
+    public void sendRiskChangeToMq(String templateId) {
+        YtRiskTemplate riskTemplate = StrUtil.isEmpty(templateId) ? null : riskMapper.selectOneTemplate(templateId);
+        if (riskTemplate == null) {
+            return;
+        }
+        String appId = riskTemplate.getAppId();
+        if (StrUtil.isNotEmpty(riskTemplate.getTemplateCode())) {
+            appId = riskTemplate.getTemplateCode().split("-")[0];
+        }
+        List<String> appIds = Collections.singletonList(appId);
+        YtApp ytApp = StrUtil.isEmpty(appId) ? null : appMapper.selectPrimary(appId);
+        YtPlatformUserApp platformUserApp = ytApp == null ? null : agentAppService.selectByPrimaryKey(ytApp.getSuperiorId());
+        if (platformUserApp == null && StrUtil.isNotEmpty(appId)) {
+            platformUserApp = agentAppService.selectByPrimaryKey(appId);
+            if (platformUserApp != null) {
+                List<YtApp> ytApps = appMapper.getBySuperiorId(platformUserApp.getAppId());
+                if (CollectionUtil.isNotEmpty(ytApps)) {
+                    appIds = ytApps.stream().map(YtApp::getAppId).collect(Collectors.toList());
+                }
+            }
+        }
+        if (platformUserApp == null || StrUtil.isEmpty(platformUserApp.getServiceName())) {
             return;
         }
         // 构建唯一ID,用于追踪
         String msgId = UUID.randomUUID().toString();
         CorrelationData correlationData = new CorrelationData(msgId);
+        String routingKey = StrUtil.format("{}.risk.biz.key", platformUserApp.getServiceName());
         try {
             // 尝试异步投递消息
-            rabbitTemplate.convertAndSend(RabbitMqConfig.RISK_BIZ_EXCHANGE, RabbitMqConfig.RISK_BIZ_ROUTING_KEY,
-                    appIds, correlationData);
+            rabbitTemplate.convertAndSend(RabbitMqConfig.RISK_BIZ_EXCHANGE, routingKey, appIds, correlationData);
             log.info("[rabbitmq]已通知更新");
         } catch (Exception e) {
             String retryCacheKey = StrUtil.format("risk:rabbitmq:{}:retry", msgId);
@@ -77,4 +112,4 @@ public class ReliableMessageSender {
             }
         }
     }
-}
+}

+ 2 - 2
yt-agent/agent-service/src/main/java/com/ytpm/service/impl/RiskServiceImpl.java

@@ -318,7 +318,7 @@ public class RiskServiceImpl implements RiskService {
                 param.getOperator(),param.getOperatorName()
         );
         //传入消息队列通知子服务更新
-        reliableMessageSender.sendRiskChangeToMq(Collections.singletonList(template.getAppId()));
+        reliableMessageSender.sendRiskChangeToMq(template.getTemplateId());
     }
 
     /**
@@ -373,7 +373,7 @@ public class RiskServiceImpl implements RiskService {
             riskMapper.addRiskTemplateLog(logs);
         }
         //传入消息队列通知子服务更新
-        reliableMessageSender.sendRiskChangeToMq(Collections.singletonList(template.getAppId()));
+        reliableMessageSender.sendRiskChangeToMq(param.getTemplateId());
         return Result.resultOk(RepMessage.MODIFY_SUCCESS);
     }
 

+ 22 - 16
yt-ios-lemon/lemon-ios-service/src/main/java/com/ytpm/lemonios/config/mq/RabbitMqConfig.java

@@ -5,18 +5,17 @@ import org.springframework.amqp.core.Binding;
 import org.springframework.amqp.core.BindingBuilder;
 import org.springframework.amqp.core.DirectExchange;
 import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import javax.annotation.Resource;
 import java.util.HashMap;
 import java.util.Map;
-import javax.annotation.Resource;
-
-import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
-import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.beans.factory.annotation.Qualifier;
 import java.util.concurrent.Executor;
 
 /**
@@ -67,13 +66,19 @@ public class RabbitMqConfig {
     private String dlxRoutingKey;
 
     // risk风控更新通知队列
-    public static final String RISK_BIZ_QUEUE = "risk.save.biz.queue";
-    public static final String RISK_BIZ_EXCHANGE = "risk.save.biz.exchange";
-    public static final String RISK_BIZ_ROUTING_KEY = "risk.save.biz.key";
+    public static final String RISK_BIZ_EXCHANGE = "risk.biz.exchange";
+    public static final String RISK_DLX_EXCHANGE = "risk.dlx.exchange";
+
+    @Value("${spring.rabbitmq.mq-config.risk-biz-queue}")
+    private String riskBizQueue;
+    @Value("${spring.rabbitmq.mq-config.risk-biz-routing-key}")
+    private String riskBizRoutingKey;
+
+    @Value("${spring.rabbitmq.mq-config.risk-dlx-queue}")
+    private String riskDlxQueue;
+    @Value("${spring.rabbitmq.mq-config.risk-dlx-routing-key}")
+    private String riskDlxRoutingKey;
 
-    public static final String RISK_DLX_QUEUE = "risk.save.dlx.queue";
-    public static final String RISK_DLX_EXCHANGE = "risk.save.dlx.exchange";
-    public static final String RISK_DLX_ROUTING_KEY = "risk.save.dlx.key";
 
     // ---------------- 声明死信队列 ----------------
     @Bean
@@ -93,7 +98,7 @@ public class RabbitMqConfig {
 
     @Bean
     public Queue riskDlxQueue() {
-        return new Queue(RISK_DLX_QUEUE, true);
+        return new Queue(riskDlxQueue, true);
     }
 
     @Bean
@@ -103,7 +108,7 @@ public class RabbitMqConfig {
 
     @Bean
     public Binding riskDlxBinding() {
-        return BindingBuilder.bind(riskDlxQueue()).to(riskDlxExchange()).with(RISK_DLX_ROUTING_KEY);
+        return BindingBuilder.bind(riskDlxQueue()).to(riskDlxExchange()).with(riskDlxRoutingKey);
     }
 
     // ---------------- 声明业务队列 ----------------
@@ -126,13 +131,14 @@ public class RabbitMqConfig {
         return BindingBuilder.bind(bizQueue()).to(bizExchange()).with(bizRoutingKey);
     }
 
+
     @Bean
     public Queue riskQueue() {
         Map<String, Object> args = new HashMap<>();
         // 动态指向该服务专属的死信交换机
         args.put("x-dead-letter-exchange", RISK_DLX_EXCHANGE);
-        args.put("x-dead-letter-routing-key", RISK_DLX_ROUTING_KEY);
-        return new Queue(RISK_BIZ_QUEUE, true, false, false, args);
+        args.put("x-dead-letter-routing-key", riskDlxRoutingKey);
+        return new Queue(riskBizQueue, true, false, false, args);
     }
 
     @Bean
@@ -142,7 +148,7 @@ public class RabbitMqConfig {
 
     @Bean
     public Binding riskBizBinding() {
-        return BindingBuilder.bind(bizQueue()).to(bizExchange()).with(RISK_BIZ_ROUTING_KEY);
+        return BindingBuilder.bind(riskQueue()).to(riskBizExchange()).with(riskBizRoutingKey);
     }
 
 }

+ 2 - 2
yt-ios-lemon/lemon-ios-service/src/main/java/com/ytpm/lemonios/config/mq/RiskChangeConsumer.java

@@ -23,13 +23,13 @@ import java.util.List;
 @Component
 public class RiskChangeConsumer {
 
-
     @Resource
     private RiskContent riskContent;
 
-    @RabbitListener(queues = RabbitMqConfig.RISK_BIZ_QUEUE, containerFactory = "customContainerFactory")
+    @RabbitListener(queues = "${spring.rabbitmq.mq-config.risk-biz-queue}", containerFactory = "customContainerFactory")
     public void handleMessage(List<String> appIds, Channel channel,
                               @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
+        log.info("[rabbitmq]RiskChangeConsumer ready to update local risk, appIds: {}", appIds);
         try {
             riskContent.updateLocalRisk(appIds);
             // false 表示只确认当前这一条