Pārlūkot izejas kodu

fix: 修复agent 修改风控归属人权限校验问题,增补mq投递延时重试key监听

hidewnd 2 dienas atpakaļ
vecāks
revīzija
0773612d64

+ 2 - 3
yt-agent/agent-service/src/main/java/com/ytpm/config/mq/ReliableMessageSender.java

@@ -18,7 +18,6 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -99,12 +98,12 @@ public class ReliableMessageSender {
             rabbitTemplate.convertAndSend(RabbitMqConfig.RISK_BIZ_EXCHANGE, routingKey, appIds, correlationData);
             log.info("[rabbitmq]已通知更新");
         } catch (Exception e) {
-            String retryCacheKey = StrUtil.format("risk:rabbitmq:{}:retry", msgId);
+            String cachekey = StrUtil.format("risk:rabbitmq:retry:{}", templateId);
+            String retryCacheKey = StrUtil.format("risk:rabbitmq:timeOutRetry:{}", templateId);
             int retryCount = redisService.hasKey(retryCacheKey) ? Integer.parseInt(redisService.getObj(retryCacheKey).toString()) : 0;
             if (retryCount < 3) {
                 // 如果连接不上MQ,10分钟后重新尝试
                 log.error("[rabbitmq]MQ发送请求异常,10分钟后重新尝试投递!ID: {}", msgId, e);
-                String cachekey = StrUtil.format("risk:rabbitmq:{}", msgId);
                 redisService.setTimeOutMinutesStr(cachekey, JSONArray.toJSONString(appIds), 10);
                 redisService.setTimeOutHoursStr(retryCacheKey, String.valueOf(retryCount + 1), 1);
             } else {

+ 56 - 0
yt-agent/agent-service/src/main/java/com/ytpm/config/redis/RedisKeyExpirationListener.java

@@ -0,0 +1,56 @@
+package com.ytpm.config.redis;
+
+
+import com.ytpm.config.mq.ReliableMessageSender;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.lang.NonNull;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * @author hidewnd
+ * @date 2026/2/5 15:27
+ */
+@Component
+@RefreshScope
+@Slf4j(topic = "RedisKeyExpire")
+public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
+
+    @Resource
+    private ReliableMessageSender reliableMessageSender;
+
+    /**
+     * Creates new {@link } for {@code __keyevent@*__:expired} messages.
+     *
+     * @param listenerContainer must not be {@literal null}.
+     */
+    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
+        super(listenerContainer);
+    }
+
+
+    /**
+     * 监听redis过期的 key 进行处理
+     */
+    @Override
+    public void onMessage(@NonNull Message message, byte[] pattern) {
+        String key = String.valueOf(message);
+        // 监听risk:rabbitmq 事件
+        if (key.startsWith("risk:rabbitmq:retry")) {
+            handleRetrySendMsg(key);
+        }
+    }
+
+    private void handleRetrySendMsg(String key) {
+        String[] split = key.split(":");
+        String templateId = split[split.length - 1];
+        reliableMessageSender.sendRiskChangeToMq(templateId);
+    }
+
+
+}

+ 28 - 0
yt-agent/agent-service/src/main/java/com/ytpm/config/redis/RedisListenerConfig.java

@@ -0,0 +1,28 @@
+package com.ytpm.config.redis;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+
+/**
+ * redis 监听器配置
+ * @author marx
+ * @date 2025/7/29 16:09
+ */
+@Configuration
+public class RedisListenerConfig {
+
+    @Bean
+    RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) {
+        RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
+        listenerContainer.setConnectionFactory(connectionFactory);
+        return listenerContainer;
+    }
+
+    @Bean("agentRedisKeyListener")
+    KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
+        return new RedisKeyExpirationListener(listenerContainer);
+    }
+}

+ 10 - 2
yt-agent/agent-service/src/main/java/com/ytpm/service/impl/RiskServiceImpl.java

@@ -100,6 +100,9 @@ public class RiskServiceImpl implements RiskService {
     }
 
     private void checkAppPermission(YtApp ytApp, String userId) {
+        if (ytApp != null && StrUtil.isEmpty(ytApp.getSuperiorId())) {
+            ytApp = appMapper.selectPrimary(ytApp.getAppId());
+        }
         YtPlatformUserApp platformUserApp = ytApp == null ? null : agentAppMapper.selectByPrimaryKey(ytApp.getSuperiorId());
         if (platformUserApp != null && StringUtils.equals(platformUserApp.getUserId(), userId)) {
             return;
@@ -440,7 +443,6 @@ public class RiskServiceImpl implements RiskService {
         if (CollUtil.isEmpty(param.getConfigList())) {
             return Result.resultErr(RepMessage.COLLECTION_EMPTY);
         }
-        checkAppPermission(curUser, param.getAppId());
         List<YtRiskConfig> configs = new ArrayList<>();
         YtRiskConfig config;
         StringBuilder content = new StringBuilder();
@@ -493,7 +495,13 @@ public class RiskServiceImpl implements RiskService {
         if (Objects.isNull(old)) {
             return Result.resultErr(RepMessage.OBJECT_NOT_EXIST);
         }
-        checkAppPermission(curUser, param.getAppId());
+        String appId = old.getAppId();
+        if (StrUtil.isEmpty(appId) && StrUtil.isNotEmpty(old.getTemplateCode())) {
+            appId = old.getTemplateCode().split("-")[0];
+        }
+        if (StrUtil.isNotEmpty(appId)) {
+            checkAppPermission(curUser, appId);
+        }
         // 获取原有配置信息
         List<YtRiskConfig> configs = riskMapper.selectRiskConfig(param.getTemplateId());
         Map<String, String> valueMap = configs.stream().collect(Collectors.toMap(