详解Spring Cloud Gateway 限流操作
开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。 API网关作为所有请求的入口,请求量大,我们可以通过对并发访问的请求进行限速来保护系统的可用性。 常用的限流算法比如有令牌桶算法,漏桶算法,计数器算法等。 在Zuul中我们可以自己去实现限流的功能 (Zuul中如何限流在我的书 《Spring Cloud微服务-全栈技术与案例解析》 中有详细讲解) ,Spring Cloud Gateway的出现本身就是用来替代Zuul的。 要想替代那肯定得有强大的功能,除了性能上的优势之外,Spring Cloud Gateway还提供了很多新功能,比如今天我们要讲的限流操作,使用起来非常简单,今天我们就来学习在如何在Spring Cloud Gateway中进行限流操作。 目前限流提供了基于Redis的实现,我们需要增加对应的依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency> 可以通过KeyResolver来指定限流的Key,比如我们需要根据用户来做限流,IP来做限流等等。 IP限流 @Bean public KeyResolver ipKeyResolver() { return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName()); } 通过exchange对象可以获取到请求信息,这边用了HostName,如果你想根据用户来做限流的话这边可以获取当前请求的用户ID或者用户名就可以了,比如: 用户限流 使用这种方式限流,请求路径中必须携带userId参数。 @Bean KeyResolver userKeyResolver() { return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("userId")); } 接口限流 获取请求地址的uri作为限流key。 @Bean KeyResolver apiKeyResolver() { return exchange -> Mono.just(exchange.getRequest().getPath().value()); } 然后配置限流的过滤器信息: server: port: 8084 spring: redis: host: 127.0.0.1 port: 6379 cloud: gateway: routes: - id: fsh-house uri: lb://fsh-house predicates: - Path=/house/** filters: - name: RequestRateLimiter args: redis-rate-limiter.replenishRate: 10 redis-rate-limiter.burstCapacity: 20 key-resolver: "#{@ipKeyResolver}"
可以访问接口进行测试,这时候Redis中会有对应的数据: 127.0.0.1:6379> keys * 大括号中就是我们的限流Key,这边是IP,本地的就是localhost
Spring Cloud Gateway目前提供的限流还是相对比较简单的,在实际中我们的限流策略会有很多种情况,比如:
当然我们也可以通过重新RedisRateLimiter来实现自己的限流策略,这个我们后面再进行介绍。 限流源码 // routeId也就是我们的fsh-house,id就是限流的key,也就是localhost。 public Mono<Response> isAllowed(String routeId,String id) { // 会判断RedisRateLimiter是否初始化了 if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } // 获取routeId对应的限流配置 Config routeConfig = getConfig().getOrDefault(routeId,defaultConfig); if (routeConfig == null) { throw new IllegalArgumentException("No Configuration found for route " + routeId); } // 允许用户每秒做多少次请求 int replenishRate = routeConfig.getReplenishRate(); // 令牌桶的容量,允许在一秒钟内完成的最大请求数 int burstCapacity = routeConfig.getBurstCapacity(); try { // 限流key的名称(request_rate_limiter.{localhost}.timestamp,request_rate_limiter.{localhost}.tokens) List<String> keys = getKeys(id); // The arguments to the LUA script. time() returns unixtime in seconds. List<String> scriptArgs = Arrays.asList(replenishRate + "",burstCapacity + "",Instant.now().getEpochSecond() + "","1"); // allowed,tokens_left = redis.eval(SCRIPT,keys,args) // 执行LUA脚本 Flux<List<Long>> flux = this.redisTemplate.execute(this.script,scriptArgs); // .log("redisratelimiter",Level.FINER); return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L,-1L))) .reduce(new ArrayList<Long>(),(longs,l) -> { longs.addAll(l); return longs; }) .map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); Response response = new Response(allowed,getHeaders(routeConfig,tokensLeft)); if (log.isDebugEnabled()) { log.debug("response: " + response); } return response; }); } catch (Exception e) { log.error("Error determining if user allowed from redis",e); } return Mono.just(new Response(true,-1L))); } LUA脚本在: local tokens_key = KEYS[1] local timestamp_key = KEYS[2] --redis.log(redis.LOG_WARNING,"tokens_key " .. tokens_key) local rate = tonumber(ARGV[1]) local capacity = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local requested = tonumber(ARGV[4]) local fill_time = capacity/rate local ttl = math.floor(fill_time*2) --redis.log(redis.LOG_WARNING,"rate " .. ARGV[1]) --redis.log(redis.LOG_WARNING,"capacity " .. ARGV[2]) --redis.log(redis.LOG_WARNING,"now " .. ARGV[3]) --redis.log(redis.LOG_WARNING,"requested " .. ARGV[4]) --redis.log(redis.LOG_WARNING,"filltime " .. fill_time) --redis.log(redis.LOG_WARNING,"ttl " .. ttl) local last_tokens = tonumber(redis.call("get",tokens_key)) if last_tokens == nil then last_tokens = capacity end --redis.log(redis.LOG_WARNING,"last_tokens " .. last_tokens) local last_refreshed = tonumber(redis.call("get",timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.LOG_WARNING,"last_refreshed " .. last_refreshed) local delta = math.max(0,now-last_refreshed) local filled_tokens = math.min(capacity,last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end --redis.log(redis.LOG_WARNING,"delta " .. delta) --redis.log(redis.LOG_WARNING,"filled_tokens " .. filled_tokens) --redis.log(redis.LOG_WARNING,"allowed_num " .. allowed_num) --redis.log(redis.LOG_WARNING,"new_tokens " .. new_tokens) redis.call("setex",tokens_key,ttl,new_tokens) redis.call("setex",timestamp_key,now) return { allowed_num,new_tokens } 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。 您可能感兴趣的文章:
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |