Browse Source

FeignClientInvoker 优化
风控 ScheduledExecutor调度资源优化

marxjaw 1 month ago
parent
commit
37fee6246b

+ 0 - 4
yt-risk/risk-manage/src/main/java/com/ytpm/RiskManageApplication.java

@@ -24,8 +24,4 @@ public class RiskManageApplication
     {
         SpringApplication.run(RiskManageApplication.class, args);
     }
-    @Bean(destroyMethod = "shutdown")
-    public ScheduledExecutorService riskScheduledExecutor() {
-        return new ScheduledThreadPoolExecutor(5, r -> new Thread(r, "risk-scheduler"), new ThreadPoolExecutor.DiscardPolicy());
-    }
 }

+ 28 - 0
yt-risk/risk-manage/src/main/java/com/ytpm/config/SchedulerConfig.java

@@ -0,0 +1,28 @@
+package com.ytpm.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+
+@Configuration
+public class SchedulerConfig {
+
+    @Bean(destroyMethod = "shutdown")
+    public ScheduledExecutorService riskScheduledExecutorService() {
+        ThreadFactory factory = r -> {
+            Thread t = new Thread(r);
+            t.setName("risk-scheduler");
+            t.setDaemon(true);
+            return t;
+        };
+        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, factory, new ThreadPoolExecutor.DiscardPolicy());
+        executor.setRemoveOnCancelPolicy(true);
+        return executor;
+    }
+}
+
+

+ 1 - 4
yt-risk/risk-manage/src/main/java/com/ytpm/service/impl/RiskServiceImpl.java

@@ -83,7 +83,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -1053,8 +1052,7 @@ public class RiskServiceImpl implements RiskService {
                     YtApp ytApp = appMapper.selectRiskApp(dyzUser.getAppId());
                     userApp =  appMapper.selectParentApp(ytApp.getSuperiorId());
                 }
-                ScheduledExecutorService scheduled  = Executors.newSingleThreadScheduledExecutor();
-                scheduled.schedule(()->{
+                scheduledExecutorService.schedule(()->{
                     YtDyzUser next = new YtDyzUser();
                     next.setUserId(dyzUser.getUserId());
                     next.setUserStatus(UserStatusEnum.VISITOR_LOCK.getCode());
@@ -1063,7 +1061,6 @@ public class RiskServiceImpl implements RiskService {
                     feignInvoker.invoke(userApp.getServiceName(), "updateUserInfo",next);
                 },300, TimeUnit.MILLISECONDS);
                 //修改为解锁用户存入redis 24小时后进行解锁
-                scheduled.shutdown();
                 log.debug(StrUtil.format("[checkLoginRisk] appId:{} (end - start):{}",
                         dyzUser.getAppId(), System.currentTimeMillis() - start));
                 return Result.resultErr(RepMessage.RISK_VISITOR_LOWER_VALUE);

+ 120 - 37
yt-risk/risk-manage/src/main/java/com/ytpm/util/FeignClientInvoker.java

@@ -3,10 +3,17 @@ package com.ytpm.util;
 import org.springframework.cloud.openfeign.FeignClient;
 import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
+import org.springframework.beans.factory.annotation.Value;
 
 import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 
 /**
  * FeignClient公共调用类
@@ -19,11 +26,23 @@ public class FeignClientInvoker {
     private final ApplicationContext applicationContext;
     private final Map<String, Object> feignClientCache = new ConcurrentHashMap<>();
     private final Map<String, Method> methodCache = new ConcurrentHashMap<>();
+    @Value("${risk.feign.timeout-ms:2000}")
+    private int feignTimeoutMs;
+    @Value("${risk.feign.max-concurrent:50}")
+    private int maxConcurrent;
+    private Semaphore bulkhead;
+    @Resource
+    private ScheduledExecutorService scheduledExecutorService;
 
     public FeignClientInvoker(ApplicationContext applicationContext) {
         this.applicationContext = applicationContext;
     }
 
+    @PostConstruct
+    public void init() {
+        this.bulkhead = new Semaphore(Math.max(1, maxConcurrent), true);
+    }
+
     /**
      * 动态调用FeignClient方法
      * @param serviceName FeignClient的name属性值
@@ -32,58 +51,122 @@ public class FeignClientInvoker {
      * @return 调用结果
      */
     public Object invoke(String serviceName, String methodName, Object... args) {
+        boolean acquired = bulkhead.tryAcquire();
+        if (!acquired) {
+            throw new RuntimeException("调用FeignClient被限流(并发超限): " + serviceName);
+        }
         try {
-            // 1. 获取FeignClient实例
             Object feignClient = getFeignClient(serviceName);
-            
-            // 2. 获取目标方法
             Method method = getTargetMethod(feignClient, methodName, args);
-            
-            // 3. 执行方法调用
-            return method.invoke(feignClient, args);
+            return CompletableFuture.supplyAsync(() -> {
+                try {
+                    return method.invoke(feignClient, args);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }, scheduledExecutorService).get(feignTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (java.util.concurrent.TimeoutException te) {
+            throw new RuntimeException("调用FeignClient超时: " + serviceName + ", 方法: " + methodName);
         } catch (Exception e) {
             throw new RuntimeException("调用FeignClient失败: " + serviceName, e);
+        } finally {
+            bulkhead.release();
         }
     }
 
     private Object getFeignClient(String serviceName) {
-        // 优先从缓存获取
-        if (feignClientCache.containsKey(serviceName)) {
-            return feignClientCache.get(serviceName);
+        return feignClientCache.computeIfAbsent(serviceName, name -> {
+            Map<String, Object> feignClients = applicationContext.getBeansWithAnnotation(FeignClient.class);
+            for (Object bean : feignClients.values()) {
+                Class<?>[] interfaces = bean.getClass().getInterfaces();
+                for (Class<?> iface : interfaces) {
+                    FeignClient annotation = iface.getAnnotation(FeignClient.class);
+                    if (annotation != null) {
+                        String annName = annotation.name().isEmpty() ? annotation.value() : annotation.name();
+                        if (name.equals(annName)) {
+                            return bean;
+                        }
+                    }
+                }
+            }
+            throw new IllegalArgumentException("未找到服务: " + name);
+        });
+    }
+
+    private Method getTargetMethod(Object feignClient, String methodName, Object[] args)
+        throws NoSuchMethodException {
+        Class<?> proxyClass = feignClient.getClass();
+        Class<?>[] interfaces = proxyClass.getInterfaces();
+
+        Method resolved = null;
+        for (Class<?> iface : interfaces) {
+            resolved = findCompatibleMethod(iface, methodName, args);
+            if (resolved != null) {
+                String cacheKey = iface.getName() + "#" + resolved.getName() + signatureOf(resolved.getParameterTypes());
+                Method cached = methodCache.putIfAbsent(cacheKey, resolved);
+                return cached != null ? cached : resolved;
+            }
         }
-        
-        // 动态查找标注@FeignClient的Bean
-        Map<String, Object> feignClients = applicationContext.getBeansWithAnnotation(FeignClient.class);
-        for (Object bean : feignClients.values()) {
-            FeignClient annotation = bean.getClass().getInterfaces()[0].getAnnotation(FeignClient.class);
-            if (annotation != null && serviceName.equals(annotation.name())) {
-                feignClientCache.put(serviceName, bean);
-                return bean;
+        // 兜底:在代理类上尝试(不推荐,但避免因代理结构差异导致失败)
+        resolved = findCompatibleMethod(proxyClass, methodName, args);
+        if (resolved != null) {
+            String cacheKey = proxyClass.getName() + "#" + resolved.getName() + signatureOf(resolved.getParameterTypes());
+            Method cached = methodCache.putIfAbsent(cacheKey, resolved);
+            return cached != null ? cached : resolved;
+        }
+        throw new NoSuchMethodException("未找到方法: " + methodName);
+    }
+
+    private Method findCompatibleMethod(Class<?> type, String methodName, Object[] args) {
+        Method[] methods = type.getMethods();
+        for (Method m : methods) {
+            if (!m.getName().equals(methodName)) {
+                continue;
+            }
+            Class<?>[] paramTypes = m.getParameterTypes();
+            if (paramTypes.length != (args == null ? 0 : args.length)) {
+                continue;
+            }
+            boolean match = true;
+            for (int i = 0; i < paramTypes.length; i++) {
+                Object arg = args[i];
+                if (arg == null) {
+                    continue; // 允许null匹配任意引用类型
+                }
+                if (!wrap(paramTypes[i]).isAssignableFrom(arg.getClass())) {
+                    match = false;
+                    break;
+                }
+            }
+            if (match) {
+                return m;
             }
         }
-        throw new IllegalArgumentException("未找到服务: " + serviceName);
+        return null;
     }
 
-    private Method getTargetMethod(Object feignClient, String methodName, Object[] args) 
-        throws NoSuchMethodException {
-        
-        String cacheKey = feignClient.getClass().getName() + "#" + methodName;
-        
-        // 缓存命中直接返回
-        if (methodCache.containsKey(cacheKey)) {
-            return methodCache.get(cacheKey);
+    private Class<?> wrap(Class<?> type) {
+        if (!type.isPrimitive()) {
+            return type;
         }
-        
-        // 获取参数类型
-        Class<?>[] argTypes = new Class[args.length];
-        for (int i = 0; i < args.length; i++) {
-            argTypes[i] = args[i].getClass();
+        if (type == int.class) return Integer.class;
+        if (type == long.class) return Long.class;
+        if (type == boolean.class) return Boolean.class;
+        if (type == double.class) return Double.class;
+        if (type == float.class) return Float.class;
+        if (type == char.class) return Character.class;
+        if (type == byte.class) return Byte.class;
+        if (type == short.class) return Short.class;
+        return type;
+    }
+
+    private String signatureOf(Class<?>[] paramTypes) {
+        StringBuilder sb = new StringBuilder("(");
+        for (int i = 0; i < paramTypes.length; i++) {
+            if (i > 0) sb.append(",");
+            sb.append(paramTypes[i].getName());
         }
-        
-        // 反射获取方法
-        Method method = feignClient.getClass().getMethod(methodName, argTypes);
-        methodCache.put(cacheKey, method);
-        
-        return method;
+        sb.append(")");
+        return sb.toString();
     }
 }