فهرست منبع

fix: 修复ads首页统计超时问题

hidewnd 3 روز پیش
والد
کامیت
f990a11d38

+ 47 - 0
yt-middle/middle-platform/src/main/java/com/ytpm/middle/config/ThreadPoolConfig.java

@@ -0,0 +1,47 @@
+package com.ytpm.middle.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+@Configuration
+public class ThreadPoolConfig {
+
+    @Value("${spring.custom.executor.core-pool-size:10}")
+    private Integer corePoolSize;
+
+    @Value("${spring.custom.executor.max-pool-size:100}")
+    private Integer maxPoolSize;
+
+    @Value("${spring.custom.executor.queue-capacity:1024}")
+    private Integer queueCapacity;
+
+    @Value("${spring.custom.executor.keep-alive-seconds:60}")
+    private Integer keepAliveSeconds;
+
+    @Bean("customExecutor")
+    public ThreadPoolTaskExecutor customExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        // 核心线程数
+        executor.setCorePoolSize(corePoolSize);
+        // 最大线程数
+        executor.setMaxPoolSize(maxPoolSize);
+        // 队列容量
+        executor.setQueueCapacity(queueCapacity);
+        // 线程活跃时间(秒)
+        executor.setKeepAliveSeconds(keepAliveSeconds);
+        // 线程名称前缀
+        executor.setThreadNamePrefix("custom-pool-");
+        // 拒绝策略:由调用者所在的线程来执行
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        executor.setAwaitTerminationSeconds(30);
+        // 初始化
+        executor.initialize();
+        return executor;
+    }
+
+}

+ 83 - 25
yt-middle/middle-platform/src/main/java/com/ytpm/middle/service/impl/CountServiceImpl.java

@@ -26,8 +26,10 @@ import com.ytpm.middle.view.DashboardTopCountVo;
 import com.ytpm.middle.view.UserRankingListVO;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
@@ -37,6 +39,8 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -62,6 +66,10 @@ public class CountServiceImpl implements CountService {
     @Resource
     private AppMapper appMapper;
 
+    @Resource
+    @Qualifier("customExecutor")
+    private ThreadPoolTaskExecutor customExecutor;
+
     /**
      * 获取顶部数据统计
      */
@@ -75,17 +83,36 @@ public class CountServiceImpl implements CountService {
                 .collect(Collectors.toMap(FeignServeAppView::getServiceName, FeignServeAppView::getAppIds));
         BigDecimal resultAdCount = new BigDecimal(0);
         BigDecimal resultRevenue = new BigDecimal(0);
-        for (Map.Entry<String, String> entry : serveMap.entrySet()) {
-            Object o;
+        List<CompletableFuture<JSONObject>> futures = serveMap.entrySet().stream()
+                .map(entry -> CompletableFuture.supplyAsync(() -> {
+                    try {
+                        Object o = feignInvoker.invoke(entry.getKey(), "getAdCount", entry.getValue());
+                        return JSONObject.parseObject(JSON.toJSONString(o));
+                    } catch (Exception e) {
+                        log.error("Service {} getAdCount error: {}", entry.getKey(), e.getMessage());
+                        return null;
+                    }
+                }, customExecutor))
+                .collect(Collectors.toList());
+
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+
+        for (CompletableFuture<JSONObject> future : futures) {
             try {
-                o = feignInvoker.invoke(entry.getKey(), "getAdCount",entry.getValue());
-            }catch (Exception e){
-                log.error(e.getMessage(),e);
-                continue;
+                JSONObject object = future.get();
+                if (object != null) {
+                    BigDecimal adCount = object.getBigDecimal("adCount");
+                    BigDecimal revenue = object.getBigDecimal("expectRevenue");
+                    if (adCount != null) {
+                        resultAdCount = resultAdCount.add(adCount);
+                    }
+                    if (revenue != null) {
+                        resultRevenue = resultRevenue.add(revenue);
+                    }
+                }
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
             }
-            JSONObject object = JSONObject.parseObject(JSON.toJSONString(o));
-            resultAdCount = resultAdCount.add(object.getBigDecimal("adCount"));
-            resultRevenue = resultRevenue.add(object.getBigDecimal("expectRevenue"));
         }
         vo.setAdCount(resultAdCount.intValue());
         vo.setExpectRevenue(resultRevenue);
@@ -100,9 +127,10 @@ public class CountServiceImpl implements CountService {
         //判断redis中是否存在排行榜信息 不存在则查询数据进行更新,存在则直接取出
         if(Boolean.TRUE.equals(redisUtil.hasKey(RANKING_KEY+sortBy))){
             String ranking = redisUtil.getStr(RANKING_KEY+sortBy);
-            long expire = redisUtil.getExpire(RANKING_KEY + sortBy);
-            if(Math.subtractExact(rankingExpire,expire)<2){
-                getRankingAnCache(sortBy);
+            Long expire = redisUtil.getExpire(RANKING_KEY + sortBy);
+            // 缓存即将过期时(剩余时间小于2分钟),异步刷新缓存
+            if(expire != null && expire > 0 && expire < 2){
+                CompletableFuture.runAsync(() -> getRankingAnCache(sortBy), customExecutor);
             }
             return Result.resultObjOk(JSON.parseObject(ranking, DashboardRankingListVO.class));
         }
@@ -120,18 +148,45 @@ public class CountServiceImpl implements CountService {
         List<AppRankingListVO> appRankingList = new ArrayList<>();
         List<UserRankingListVO> userRankingList = new ArrayList<>();
         boolean hasError = false;
-        for (String serve : serveList) {
-            Object o;
-            try{
-                o = feignInvoker.invoke(serve,"queryRankingList",sortBy,rankingLimit);
-            }catch (Exception e){
-                log.error(e.getMessage(),e);
+        List<CompletableFuture<DashboardRankingListVO>> futures = serveList.stream()
+                .map(serve -> CompletableFuture.supplyAsync(() -> {
+                    try {
+                        Object o = feignInvoker.invoke(serve, "queryRankingList", sortBy, rankingLimit);
+                        return JSONObject.parseObject(JSON.toJSONString(o), DashboardRankingListVO.class);
+                    } catch (Exception e) {
+                        log.error("Service {} queryRankingList error: {}", serve, e.getMessage());
+                        return null;
+                    }
+                }, customExecutor))
+                .collect(Collectors.toList());
+
+        // 设置总超时时间,例如 20 秒,避免长时间阻塞
+        try {
+            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(20, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("Wait for all ranking list tasks failed or timeout: {}", e.getMessage());
+            // 超时或异常时不抛出,继续处理已完成的任务
+        }
+
+        for (CompletableFuture<DashboardRankingListVO> future : futures) {
+            try {
+                // 对于未完成的任务,getNow(null) 会立即返回 null
+                DashboardRankingListVO vo = future.getNow(null);
+                if (vo != null) {
+                    if (CollUtil.isNotEmpty(vo.getAppRankingList())) {
+                        appRankingList.addAll(vo.getAppRankingList());
+                    }
+                    if (CollUtil.isNotEmpty(vo.getUserRankingList())) {
+                        userRankingList.addAll(vo.getUserRankingList());
+                    }
+                } else {
+                    // 任务未完成(超时)或返回null(异常)都视为失败
+                    hasError = true;
+                }
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
                 hasError = true;
-                continue;
             }
-            DashboardRankingListVO vo = JSONObject.parseObject(JSON.toJSONString(o), DashboardRankingListVO.class);
-            appRankingList.addAll(vo.getAppRankingList());
-            userRankingList.addAll(vo.getUserRankingList());
         }
         //设置应用名称
         Map<String, String> collect = views.stream().collect(Collectors.toMap(AgentAppView::getAppId, AgentAppView::getAppName));
@@ -142,9 +197,12 @@ public class CountServiceImpl implements CountService {
                 .reversed()).limit(5).collect(Collectors.toList()));
         rankingVO.setAppRankingList(appRankingList.stream().sorted(Comparator.comparing(AppRankingListVO::getTotalRevenue)
                 .reversed()).limit(5).collect(Collectors.toList()));
-        if(!hasError){
-            redisUtil.setTimeOutMinutesStr(RANKING_KEY+sortBy, JSON.toJSONString(rankingVO),rankingExpire);
-        }
+        
+        // 即使有部分服务超时或失败,也进行缓存,避免每次请求都等待超时
+        // 如果有错误,缓存时间缩短为5分钟,以便尽快恢复;无错误则正常缓存
+        long expireTime = hasError ? 5 : rankingExpire;
+        redisUtil.setTimeOutMinutesStr(RANKING_KEY+sortBy, JSON.toJSONString(rankingVO), expireTime);
+        
         return rankingVO;
     }