前面的文章讲了Redis中如何使用Lua脚本实现原子操作,这次来分享一个更实用的例子:令牌桶限流,这个限流可以用来限制某个应用或接口的流入流量,也可以用来控制调用外部接口的qps。接下来我们通过模拟一个场景来看看具体是如何实现的。
假设场景
我的系统需要调用多个外部系统接口,但是外部的接口都限制了给我调用的qps,所以我不能无限制的去调这些接口,需要有个限流的控件来将我调用接口的频率限制在给定的qps值内。
设计方案
由于我的系统是分布式系统,并且每个节点都是集群部署的,所以限流不能只做成单机的,要做成集群可用的,这就需要借助于外部中间件,这里我们选择Redis,算法采用令牌桶算法。
令牌桶算法简介
令牌桶算法是一种用于流量控制的经典算法,常被用于限制系统的请求速率。该算法维护一个令牌桶,以固定的速率往令牌桶中放入令牌,每个令牌代表一个可用的请求。当请求到达时,如果桶中有足够的令牌,允许请求通过并消耗一个令牌;如果桶中没有足够的令牌,则拒绝该请求。
以下是令牌桶算法的基本原理:
令牌生成: 以固定的速率生成令牌,并放入令牌桶中。生成的速率可以表示为每秒生成的令牌数量。 令牌消耗: 每当有请求到达时,如果令牌桶中有足够的令牌,则允许请求通过,并从令牌桶中消耗一个令牌。如果令牌桶中没有足够的令牌,则拒绝请求。 令牌桶容量: 令牌桶还有一个容量上限,即最大可存放的令牌数量。如果桶满了,新生成的令牌也不会超过这个容量。
这种方式可以有效控制系统的请求速率,防止突发大流量对系统造成压力。令牌桶算法的优点在于对突发流量的处理比较灵活,可以通过调整生成令牌的速率和桶的容量来适应不同的场景。
Redis键值设计
我们使用Redis的Hash结构来构造一个对象,Key的命名规则为【rate_limiter_+接口别名】,Value为以下两个字段
last_time:13位毫秒级时间戳,上次请求获取令牌的时间stored_token:令牌桶中当前存储可用的令牌数量
lua脚本的
KEYS:
Redis中Hash结构的Key
ARGV:
qps:接口的qps值,当做令牌桶的最大容量add_rate:往令牌桶中添加令牌的速率,单位毫秒current_time:当前13位时间戳
借助这个hash结构使用令牌桶算法来控制请求接口的qps。每次调接口就去获取一个令牌,如果能获取到就放行,如果获取不到就等待重试。
每次调用时,比较当前时间戳和 last_time 的差值,根据 add_rate 计算这段时间应该产生几个令牌,并放到桶中,放满为止。计算完后,如果stored_token够本次需要的令牌数(固定为1)则将stored_token减一,返回成功(1)。如果数量不够的话,返回失败(0)。
代码实现
lua脚本 rate_limiter.lua
local api_key = KEYS[1] -- 限流hash结构的key
local qps = ARGV[1] -- 接口的qps
local add_rate = ARGV[2] -- 往令牌桶中添加令牌的速率,单位毫秒
local current_time = ARGV[3] -- 当前13位时间戳
local cache = redis.call('HMGET', api_key, 'stored_token', 'last_time')
local stored_token = cache[1]
local last_time = cache[2]
-- 如果第一次调用,Hash结构还不存在,则令牌桶中的令牌数从0算起
if stored_token then
-- 和上次调用时间比较,计算这段时间应产生的令牌数
local calculate_token_num = (current_time - last_time) / add_rate
stored_token = math.min(stored_token + calculate_token_num, qps)
else
stored_token = 0
end
if stored_token >= 1 then
redis.call('HMSET', api_key, 'stored_token', stored_token - 1, 'last_time', current_time)
return 1
else
redis.call('HMSET', api_key, 'stored_token', stored_token, 'last_time', current_time)
return 0
end
java代码示例
@SpringBootTest
@Slf4j
public class LuaRateLimiterTest {
@Autowired
RedisTemplate
public static final String RATE_LIMITER_KEY_PREFIX = "rate_limiter_";
@Test
public void rateLimiterTest() throws InterruptedException {
// 热身一下redis连接
warmRedis();
// 假定api接口名为getUser, qps被限制为5
String apiName = "getUser";
Integer qps = 5;
// 模拟并发调用20次接口
CountDownLatch countDownLatch = new CountDownLatch(20);
log.info("开始调用,{}", DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN));
for (int i = 1; i <= 20; i++) {
int finalI = i;
CompletableFuture.runAsync(() -> {
while (true) {
if (acquireToken(apiName, qps)) {
log.info("调用编号:{},调用成功时间:{}", finalI, DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN));
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
countDownLatch.countDown();
});
}
countDownLatch.await();
log.info("调用结束");
}
/**
* 请求令牌
*/
private boolean acquireToken(String apiName, Integer qps) {
String key = RATE_LIMITER_KEY_PREFIX + apiName;
// 计算令牌生成的速率,单位毫秒
long addRate = 1000L / qps;
long currentTime = System.currentTimeMillis();
DefaultRedisScript
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/rate_limiter.lua")));
redisScript.setResultType(Long.class);
Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), qps, addRate, currentTime);
return result != null && result == 1d;
}
/**
* 正常工作前,热身redis连接
*/
private void warmRedis() {
for (int i = 0; i < 10; i++) {
Object o = redisTemplate.opsForValue().get("aaa");
}
}
}
结果打印
首次执行
开始调用,2023-12-29 03:11:54.021
调用编号:9,调用成功时间:2023-12-29 03:11:54.241
调用编号:1,调用成功时间:2023-12-29 03:11:54.442
调用编号:7,调用成功时间:2023-12-29 03:11:54.646
调用编号:10,调用成功时间:2023-12-29 03:11:54.842
调用编号:8,调用成功时间:2023-12-29 03:11:55.040
调用编号:18,调用成功时间:2023-12-29 03:11:55.261
调用编号:19,调用成功时间:2023-12-29 03:11:55.441
调用编号:3,调用成功时间:2023-12-29 03:11:55.641
调用编号:4,调用成功时间:2023-12-29 03:11:55.844
调用编号:12,调用成功时间:2023-12-29 03:11:56.053
调用编号:5,调用成功时间:2023-12-29 03:11:56.248
调用编号:15,调用成功时间:2023-12-29 03:11:56.442
调用编号:14,调用成功时间:2023-12-29 03:11:56.644
调用编号:2,调用成功时间:2023-12-29 03:11:56.843
调用编号:11,调用成功时间:2023-12-29 03:11:57.052
调用编号:20,调用成功时间:2023-12-29 03:11:57.256
调用编号:16,调用成功时间:2023-12-29 03:11:57.446
调用编号:6,调用成功时间:2023-12-29 03:11:57.644
调用编号:13,调用成功时间:2023-12-29 03:11:57.858
调用编号:17,调用成功时间:2023-12-29 03:11:58.055
调用结束
可以看到每次请求的间隔都在200毫秒左右,满足我们qps=5的设定。
再次执行
开始调用,2023-12-29 10:01:31.813
调用编号:4,调用成功时间:2023-12-29 10:01:31.905
调用编号:1,调用成功时间:2023-12-29 10:01:31.905
调用编号:5,调用成功时间:2023-12-29 10:01:31.905
调用编号:2,调用成功时间:2023-12-29 10:01:31.905
调用编号:7,调用成功时间:2023-12-29 10:01:31.905
调用编号:12,调用成功时间:2023-12-29 10:01:32.054
调用编号:6,调用成功时间:2023-12-29 10:01:32.267
调用编号:3,调用成功时间:2023-12-29 10:01:32.445
调用编号:14,调用成功时间:2023-12-29 10:01:32.646
调用编号:13,调用成功时间:2023-12-29 10:01:32.850
调用编号:17,调用成功时间:2023-12-29 10:01:33.067
调用编号:8,调用成功时间:2023-12-29 10:01:33.271
调用编号:15,调用成功时间:2023-12-29 10:01:33.486
调用编号:20,调用成功时间:2023-12-29 10:01:33.683
调用编号:18,调用成功时间:2023-12-29 10:01:33.884
调用编号:16,调用成功时间:2023-12-29 10:01:34.086
调用编号:10,调用成功时间:2023-12-29 10:01:34.265
调用编号:11,调用成功时间:2023-12-29 10:01:34.461
调用编号:9,调用成功时间:2023-12-29 10:01:34.653
调用编号:19,调用成功时间:2023-12-29 10:01:34.877
调用结束
由于和上次执行有时间间隔,期间令牌桶被放满了,所以本次调用的前5个令牌请求直接就成功了,后续仍是200毫秒放行一个请求。
完活儿,这个限流算法是一个经典算法了,使用Redis来完成这个算法对于目前流行的分布式集群应用也很是实用,需要的同学代码自取,谢谢观看!
推荐阅读
发表评论