前面的文章讲了Redis中如何使用Lua脚本实现原子操作,这次来分享一个更实用的例子:令牌桶限流,这个限流可以用来限制某个应用或接口的流入流量,也可以用来控制调用外部接口的qps。接下来我们通过模拟一个场景来看看具体是如何实现的。

假设场景

我的系统需要调用多个外部系统接口,但是外部的接口都限制了给我调用的qps,所以我不能无限制的去调这些接口,需要有个限流的控件来将我调用接口的频率限制在给定的qps值内。

设计方案

由于我的系统是分布式系统,并且每个节点都是集群部署的,所以限流不能只做成单机的,要做成集群可用的,这就需要借助于外部中间件,这里我们选择Redis,算法采用令牌桶算法。

令牌桶算法简介

令牌桶算法是一种用于流量控制的经典算法,常被用于限制系统的请求速率。该算法维护一个令牌桶,以固定的速率往令牌桶中放入令牌,每个令牌代表一个可用的请求。当请求到达时,如果桶中有足够的令牌,允许请求通过并消耗一个令牌;如果桶中没有足够的令牌,则拒绝该请求。

以下是令牌桶算法的基本原理:

令牌生成: 以固定的速率生成令牌,并放入令牌桶中。生成的速率可以表示为每秒生成的令牌数量。 令牌消耗: 每当有请求到达时,如果令牌桶中有足够的令牌,则允许请求通过,并从令牌桶中消耗一个令牌。如果令牌桶中没有足够的令牌,则拒绝请求。 令牌桶容量: 令牌桶还有一个容量上限,即最大可存放的令牌数量。如果桶满了,新生成的令牌也不会超过这个容量。

这种方式可以有效控制系统的请求速率,防止突发大流量对系统造成压力。令牌桶算法的优点在于对突发流量的处理比较灵活,可以通过调整生成令牌的速率和桶的容量来适应不同的场景。

Redis键值设计

我们使用Redis的Hash结构来构造一个对象,Key的命名规则为【rate_limiter_+接口别名】,Value为以下两个字段

last_time:13位毫秒级时间戳,上次请求获取令牌的时间stored_token:令牌桶中当前存储可用的令牌数量

lua脚本的

KEYS:

Redis中Hash结构的Key

ARGV:

qps:接口的qps值,当做令牌桶的最大容量add_rate:往令牌桶中添加令牌的速率,单位毫秒current_time:当前13位时间戳

借助这个hash结构使用令牌桶算法来控制请求接口的qps。每次调接口就去获取一个令牌,如果能获取到就放行,如果获取不到就等待重试。

每次调用时,比较当前时间戳和 last_time 的差值,根据 add_rate 计算这段时间应该产生几个令牌,并放到桶中,放满为止。计算完后,如果stored_token够本次需要的令牌数(固定为1)则将stored_token减一,返回成功(1)。如果数量不够的话,返回失败(0)。

代码实现

lua脚本   rate_limiter.lua

local api_key = KEYS[1] -- 限流hash结构的key

local qps = ARGV[1] -- 接口的qps

local add_rate = ARGV[2] -- 往令牌桶中添加令牌的速率,单位毫秒

local current_time = ARGV[3] -- 当前13位时间戳

local cache = redis.call('HMGET', api_key, 'stored_token', 'last_time')

local stored_token = cache[1]

local last_time = cache[2]

-- 如果第一次调用,Hash结构还不存在,则令牌桶中的令牌数从0算起

if stored_token then

-- 和上次调用时间比较,计算这段时间应产生的令牌数

local calculate_token_num = (current_time - last_time) / add_rate

stored_token = math.min(stored_token + calculate_token_num, qps)

else

stored_token = 0

end

if stored_token >= 1 then

redis.call('HMSET', api_key, 'stored_token', stored_token - 1, 'last_time', current_time)

return 1

else

redis.call('HMSET', api_key, 'stored_token', stored_token, 'last_time', current_time)

return 0

end

java代码示例

@SpringBootTest

@Slf4j

public class LuaRateLimiterTest {

@Autowired

RedisTemplate redisTemplate;

public static final String RATE_LIMITER_KEY_PREFIX = "rate_limiter_";

@Test

public void rateLimiterTest() throws InterruptedException {

// 热身一下redis连接

warmRedis();

// 假定api接口名为getUser, qps被限制为5

String apiName = "getUser";

Integer qps = 5;

// 模拟并发调用20次接口

CountDownLatch countDownLatch = new CountDownLatch(20);

log.info("开始调用,{}", DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN));

for (int i = 1; i <= 20; i++) {

int finalI = i;

CompletableFuture.runAsync(() -> {

while (true) {

if (acquireToken(apiName, qps)) {

log.info("调用编号:{},调用成功时间:{}", finalI, DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_PATTERN));

break;

}

try {

Thread.sleep(10);

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}

countDownLatch.countDown();

});

}

countDownLatch.await();

log.info("调用结束");

}

/**

* 请求令牌

*/

private boolean acquireToken(String apiName, Integer qps) {

String key = RATE_LIMITER_KEY_PREFIX + apiName;

// 计算令牌生成的速率,单位毫秒

long addRate = 1000L / qps;

long currentTime = System.currentTimeMillis();

DefaultRedisScript redisScript = new DefaultRedisScript<>();

redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/rate_limiter.lua")));

redisScript.setResultType(Long.class);

Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), qps, addRate, currentTime);

return result != null && result == 1d;

}

/**

* 正常工作前,热身redis连接

*/

private void warmRedis() {

for (int i = 0; i < 10; i++) {

Object o = redisTemplate.opsForValue().get("aaa");

}

}

}

结果打印

首次执行

开始调用,2023-12-29 03:11:54.021

调用编号:9,调用成功时间:2023-12-29 03:11:54.241

调用编号:1,调用成功时间:2023-12-29 03:11:54.442

调用编号:7,调用成功时间:2023-12-29 03:11:54.646

调用编号:10,调用成功时间:2023-12-29 03:11:54.842

调用编号:8,调用成功时间:2023-12-29 03:11:55.040

调用编号:18,调用成功时间:2023-12-29 03:11:55.261

调用编号:19,调用成功时间:2023-12-29 03:11:55.441

调用编号:3,调用成功时间:2023-12-29 03:11:55.641

调用编号:4,调用成功时间:2023-12-29 03:11:55.844

调用编号:12,调用成功时间:2023-12-29 03:11:56.053

调用编号:5,调用成功时间:2023-12-29 03:11:56.248

调用编号:15,调用成功时间:2023-12-29 03:11:56.442

调用编号:14,调用成功时间:2023-12-29 03:11:56.644

调用编号:2,调用成功时间:2023-12-29 03:11:56.843

调用编号:11,调用成功时间:2023-12-29 03:11:57.052

调用编号:20,调用成功时间:2023-12-29 03:11:57.256

调用编号:16,调用成功时间:2023-12-29 03:11:57.446

调用编号:6,调用成功时间:2023-12-29 03:11:57.644

调用编号:13,调用成功时间:2023-12-29 03:11:57.858

调用编号:17,调用成功时间:2023-12-29 03:11:58.055

调用结束

可以看到每次请求的间隔都在200毫秒左右,满足我们qps=5的设定。

再次执行

开始调用,2023-12-29 10:01:31.813

调用编号:4,调用成功时间:2023-12-29 10:01:31.905

调用编号:1,调用成功时间:2023-12-29 10:01:31.905

调用编号:5,调用成功时间:2023-12-29 10:01:31.905

调用编号:2,调用成功时间:2023-12-29 10:01:31.905

调用编号:7,调用成功时间:2023-12-29 10:01:31.905

调用编号:12,调用成功时间:2023-12-29 10:01:32.054

调用编号:6,调用成功时间:2023-12-29 10:01:32.267

调用编号:3,调用成功时间:2023-12-29 10:01:32.445

调用编号:14,调用成功时间:2023-12-29 10:01:32.646

调用编号:13,调用成功时间:2023-12-29 10:01:32.850

调用编号:17,调用成功时间:2023-12-29 10:01:33.067

调用编号:8,调用成功时间:2023-12-29 10:01:33.271

调用编号:15,调用成功时间:2023-12-29 10:01:33.486

调用编号:20,调用成功时间:2023-12-29 10:01:33.683

调用编号:18,调用成功时间:2023-12-29 10:01:33.884

调用编号:16,调用成功时间:2023-12-29 10:01:34.086

调用编号:10,调用成功时间:2023-12-29 10:01:34.265

调用编号:11,调用成功时间:2023-12-29 10:01:34.461

调用编号:9,调用成功时间:2023-12-29 10:01:34.653

调用编号:19,调用成功时间:2023-12-29 10:01:34.877

调用结束

由于和上次执行有时间间隔,期间令牌桶被放满了,所以本次调用的前5个令牌请求直接就成功了,后续仍是200毫秒放行一个请求。

完活儿,这个限流算法是一个经典算法了,使用Redis来完成这个算法对于目前流行的分布式集群应用也很是实用,需要的同学代码自取,谢谢观看!

推荐阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。