0%

限流工具RateLimiter和Semaphore解析

前言

本文主要介绍限流工具RateLimiter和Semaphore

  1. RateLimiter介绍
  2. Semaphore
  3. 使用场景分析

RateLimiter

RateLimiter原理简介

  RateLimiter是谷歌出品的一个限流工具,使用令牌桶策略进行限流。所谓令牌桶策略,直观上来看就是:规定桶里最多能放N个令牌,如果桶没满,则按照一定的速率(可以是匀速,也可以是变速)往桶里追加令牌,如果有线程打到了这个实例上,则会拿走一个或多个令牌,如果没有令牌可以拿,就会阻塞几秒,直到可以拿到令牌为止。

  这种原理很好理解,有点类似小时候”浴缸一边加水一边放水“的感觉,但同时这个如果按照这个理解去实现这个RateLimiter的话,会出现很多的问题,比如”我们如何实现每秒钟往桶里放令牌呢?“,上面的思路实现就会是:这个类会始终启动一个线程无时无刻去放令牌。这种做法显然会造成极大的资源浪费。在下一小节,我将针对RateLimiter的部分源码来解释一下谷歌对于限流的实现。

RateLimiter源码解析

   先说结论:RateLimiter本身不会自启一个线程去往桶里放令牌,放令牌的数量是基于时间的,RateLimiter中会记录下一个请求从何时(记为x)开始才能获取到令牌,根据当前请求的时间(y)和x作比较,如果x < y,则一定可以获得令牌(可预支),然后根据其申请的令牌数计算下次可以获取到令牌的具体时间;如果x > y,则说明目前是桶中是没有令牌的,则会计算出线程获得令牌需要等待的时间(t),同时也会进一步更新,线程将沉睡t时间后再去执行方法。

这里有两点需要注意:

  1. 线程获取令牌这个操作是串行的,不会出现两个线程同时争夺同一块令牌的情况;
  2. 同时,如果桶中只有10个令牌,我的请求需要20个令牌,我是可以立刻获取到这些令牌并执行方法的,但是下一个请求将会等待这多出来的生产10个令牌的时间。这里给出谷歌官方文档的原文: It is important to note that the number of permits requested never affects the throttling of the request itself (an invocation to acquire(1) and an invocation to acquire(1000) will result in exactly the same throttling, if any), but it affects the throttling of the next request. I.e., if an expensive task arrives at an idle RateLimiter, it will be granted immediately, but it is the next request that will experience extra throttling, thus paying for the cost of the expensive task.

  下面我先介绍一下RateLimiter类的一些基本概念。RateLimiter是一个抽象的基类,自身是不含有成员变量的,仅包含了一些通用的方法,具体参数的定义都放在了它的子类SmoothRateLimiter中,这个类又派生出了两个”限流桶“的实现内部类,分别是:SmoothBursty类和SmoothWarmingUp类。前者是恒定速率生产令牌的”限流桶“,后者是变速生产令牌的桶(类似于TCP的慢开始策略)。本文因为篇幅限制,仅介绍实现稍微简单一些的SmoothBursty类。

  • 上面讲到,”限流桶“其中一个实现类是SmoothBursty,下表列出了这个类的核心参数和含义。
参数 参数含义
double storedPermits 当前存储的令牌数量
double maxPermits 允许存储的最大令牌数量 = maxBurstSeconds * permitsPerSecond
double stableIntervalMicros 稳定生产一个令牌所用的时间(微秒)
private long nextFreeTicketMicros 下次可以获取到令牌的具体时间,如果当前时间小于这个时间,则不能获取
final double maxBurstSeconds 桶中可以放几秒的限值(默认是一秒,可以用来应对突发的大流量)

  接下来,我将介绍一下RateLimiter类的几个核心方法:

  • RateLimiter.create(double permitsPerSecond)这个方法会生成一个匀速产生令牌的”限流桶”:SmoothBursty。这个函数没什么好讲的,主要就是将以上的参数都给赋值,尤其是把 storedPermits给填满。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// created方法是RateLimiter类的方法,核心是setRate,限流器核心参数的赋值都是基于我们设置的permitsPerSecond
@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

// setRate方法在下层有两个操作,其中一个是用来给stableIntervalMicros赋值,另一个用来调整maxPermits和stroedPermits

final void doSetRate(double permitsPerSecond, long nowMicros) {
// 核心方法,根据时间来更新令牌数
resync(nowMicros);
// 这个很好理解
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}

void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
// 防止更新了桶的大小后,令牌数超标的情况
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
  • 以上代码中有一个核心的方法:resync(nowMicros)。这个方法将根据目前线程的请求时间和上一次请求所间隔的时间,来计算出这段时间中桶中生产了多少个令牌,并更新这个桶中的令牌数量,这个方法是RateLimiter实现添加加令牌操作的基础方法。
1
2
3
4
5
6
7
8
9
10
11
12
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
// 计算出这段时间内,一共可以生产多少令牌
// coolDownIntervalMicros()对于SmoothBurst而言一直是stableIntervalMicros
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
// 添加令牌, 如果桶空闲了,令牌数是可以填满的
storedPermits = min(maxPermits, storedPermits + newPermits);
// 目前下一次获取令牌的时间是当前请求的时间,因为进到这个方法里表示请求一定可以拿到令牌(下面的方法会讲,RateLimiter可以预约令牌)
nextFreeTicketMicros = nowMicros;
}
}
  • RateLimiter.acquire(int permits):这个方法是从桶中拿走permits个令牌,入参可不写,默认拿走一个令牌,返回可以获得令牌等待了多长时间。如果不能够取得足够数量的令牌,则线程会阻塞一段时间,然后再尝试获取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public double acquire(int permits) {
// 核心方法,意思为”预定“,每个线程都会去申请这个令牌的获取,返回需要等待的时间
long microsToWait = reserve(permits);
// 线程根据等待的时间进行阻塞
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

// reserve操作加了锁,保证线程不会争夺同一块令牌
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
// 没什么好讲的,这个就是在计算需要等的时间
// reserveEarliestAvailable计算了当前请求能够拿到令牌的系统时间,如果是future就会阻塞,不是future就是0,表示立刻就能拿到
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}

// 具体限流桶的方法,两种限流桶不太一样(主要是因为生成的速率不一样,思路是一样的)
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 根据时间间隔计算桶中当前应该有多少令牌,更新一下stroedPermits,并把nextFreeTicketMicros和当前时间同步
resync(nowMicros);
// 进到这个方法,不管桶中令牌够不够都不会阻塞,如果不够,多余的开销由下一次请求承担,所以直接直接返回当前的时间,表示线程不会阻塞
long returnValue = nextFreeTicketMicros;
// 计算需要至多能从桶中拿多少的令牌
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// 计算预支的令牌数
double freshPermits = requiredPermits - storedPermitsToSpend;
// 计算额外的开销
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 额外的开销将会更新到”下次可以获取到令牌的具体时间“,如果额外开销很大
// 这个时间就会是很遥远的future,下次请求阻塞的时间就越长
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 如果”预支“了,storedPermits就会清零
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

RateLimiter总结

  RateLimiter是用来进行QPS限流的,请求获取令牌但不释放令牌,同时线程在获取令牌时是串行的,保证了限流数量的一致性。当有操作量大的线程一下子获取了多余的令牌后,RateLimter会使用预支的方式保证这个请求能够顺利响应,同时也不会影响到限流的可靠性,因为多余的开销会由下一次请求承担。同时RateLimiter桶在空闲时会将令牌先塞满桶,这样当在QPS突然非常高的时候(从空闲状态突然变高),RateLimiter可以短时间处理高于限值的请求数,等桶中令牌消耗完了,QPS就会趋于限值并稳定下来。但是RateLimiter获取令牌的操作是用sychronize实现的,因此其无法保证公平性。

Semaphore

Semaphore简介

  Semaphore是J.U.C提供的共享锁,名为”信号量”。这个策略比较直观,控制最大并发数,维护一个”信号量“记录当前可用并发量(同样用令牌数来表示信号量的数值),请求要执行操作需要获取一个令牌,方法执行结束后需要释放这个令牌。但信号量为0时(令牌数为0),则线程会进一个等待链表(head是个虚拟头结点,以下说的链表头都指head.next),进链表后这个线程机会阻塞,直到某个线程释放了令牌,这个释放了令牌的线程会唤醒处在链表头的线程,然后这个线程再去尝试获取令牌。 

Semaphore源码解析

  Semaphore主要是通过一个抽象的内部类Sync的两个子类:NonfairSync和FairSync(公平锁和非公平锁)来实现相关功能。这两个类的实现功能是一样的,就是线程获取令牌的公平性问题:进入等待的线程会形成一个链表,每个线程在忙循环中都会尝试去获取令牌,FairSync保证了只有链表头的线程才能够获取到最新的空闲令牌。

  • Sync类继承了AbstractQueuedSynchronizer(AQS)类,这个类中有实现功能的核心参数
参数 参数含义
private volatile int state 可用的令牌数量(设置的最大并发数),所有的操作都会基于这个值
static final class Node 等待线程的链表
Node类核心参数 参数含义
volatile Thread thread 每一个node与当前线程绑定
volatile int waitStatus 记录这个Node的状态,比如cancelled

Semaphore最核心的两个操作是acquire()和release(),下面将针对NonFairSync和FairSync详细介绍一下这两个方法。

  • acquire():这个方法会尝试获取一个令牌,对中断敏感,如果线程中断了,则会抛出异常(还有一个acquireUninterruptibly方法,对中断不敏感,这里不介绍了)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
    // 去获取一个令牌
public void acquire() throws InterruptedException {
// 获取令牌的具体实现
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquireShared 尝试获取令牌,返回获取后剩余的令牌数量,<0则表示无法获取令牌
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

// NonfairSync版本tryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
// 这个循环是为了配合CAS
for (;;) {
// 可用的令牌数
int available = getState();
// 剩余令牌数
int remaining = available - acquires;
// 根据java特性,如果剩余令牌数 < 0,就不会执行后面的CAS操作
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// FaieSync版本tryAcquireShared
protected int tryAcquireShared(int acquires) {
for (;;) {
// 主要就是加了个这个判断,如果等待链表为空,或者当前线程在等待链表的开头(head.next),则可以获取
// 这避免了新来的线程”插队“
// 因为在多线程的情况下,即使有了等待队列,available也可能会大于0
// 此时新来的线程在非公平的情况下,可能会绕过链表直接获取到令牌
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// 如果tryAcquire的值小于0了,则会执行doAcquireSharedInterruptibly方法来将执行忙循环
// 不停地去请求获取令牌,这个方法对于公不公平没有区分

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 当前请求第一次进来会在等待链表后面添加一个node
final Node node = addWaiter(Node.SHARED);
// 默认这个请求最终能获取令牌
boolean failed = true;
try {
// 线程被唤醒后再次去尝试获取令牌,获取不到就继续被阻塞
for (;;) {
// 查询这个node的prev
final Node p = node.predecessor();
// 如果这个prev是head再去尝试获取令牌(
// 等待列表是有序的,只有链表的头(head.next)才有资格去尝试获取令牌,也只有链表头会被唤醒)
if (p == head) {
// 公平锁可以保证这个线程一定可以获取到令牌,非公平锁不能保证
int r = tryAcquireShared(arg);
if (r >= 0) {
// 重新设置链表头
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}

// shouldParkAfterFailedAcquire: 重组链表,清除无效节点
// parkAndCheckInterrupt: 将当前线程挂起,同时检测中断
// 如果中断则捕获异常(这里多嘴一句,内部用了LockSupport.park()来阻塞线程
// 这个方法自己不能捕获中断,所以我们不能try..catch)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 放弃获取令牌,将node的状态置为cancelled
cancelAcquire(node);
}
}

// setHeadAndPropagate比想象中复杂
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 重新设置链表头
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果下一个节点为空,或者节点是共享模式(只有虚拟head是独占的,其他node都是共享模式),则尝试唤醒线程
// s == null 的时候一定不会唤醒后继节点,感觉有点多余
if (s == null || s.isShared())
// 尝试唤醒线程,在release方法里详细写
doReleaseShared();
}
}

这里注意一下,tryAcquireShared在AQS也有带有超时阻塞的重载方法tryAcquireSharedNanos(int arg, long nanosTimeout),当线程请求超时,直接退出自旋,但是其在Semaphore中没有使用。

  • release():释放令牌,这个操作没有特别大的学问。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 去释放一个令牌
public void release() {
// 释放的具体实现
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
// 尝试去释放令牌,tryReleaseShared已经增加了令牌数
if (tryReleaseShared(arg)) {
// 释放成功,唤醒链表头的线程
doReleaseShared();
return true;
}
return false;
}

// 这个函数比较难懂,最主要的原因就是在共享模式下
// 线程获取令牌和释放令牌都会尝试去唤醒下一个线程,所以这个函数加了一些if判断防止线程被重复唤醒,却没有唤醒应该唤醒的线程
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// SIGNAL表示下一个线程需要被唤醒
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 这个函数在唤醒线程
unparkSuccessor(h);
}
else if (ws == 0 && // 0表示dump状态,链表刚刚初始化
// PROPAGATE表示线程已经被唤醒,如果交换成功,则跳出if
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果同时有线程获取和释放了锁,这里的head可能改变,需要重新唤醒后继线程
if (h == head) // loop if head changed
break;
}
}

Semaphore总结

  Semaphore本质上一个基于AQS的共享锁,说到底还是一个锁的模式,如果锁没有释放,其他线程就得一直阻塞或者超时返回。一个非常直观的应用场景可以是数据库的连接,可以控制最大的并发数在1000,只要连接不断开,其他线程就无法访问数据库。

Semaphore和RateLimiter的区别

原理上的区别

  Semaphore限制了服务的最大并发量,并提供了等待队列来阻塞线程,当有令牌可用时会唤醒队列中的线程,队列中线程通过自旋的策略来保证自动获取令牌;而RateLimiter则是限制了QPS,本质上是对速率的一种控制。RateLimiter可能对服务器的CPU造成更大的压力,但也保证了很高的处理效率,因为其只控制在1秒内最多能有多少请求访问资源,但是不会管这个请求会占用资源多久:如果某个请求会占用这个服务2秒,我们限制了QPS为100,则这个服务最大并发量可能会有200。

使用场景上的区别

大多数场景下,限制QPS和限制并发量的场景都是互通的,限制的并发量。

  • 如果请求的平均响应时间是不确定的,那么建议控制并发量来保证服务器不会受到太大的压力;
  • 其他情况建议使用限制QPS的限流器:
    • 从两种种工具的实现上来看,Semaphore通过阻塞唤醒机制来控制线程,默认情况下线程被唤醒后不一定能够获取到令牌,从而又会重新进入阻塞状态,同时每次自旋都会重新处理等待链表也比较消耗资源,而RateLimiter会计算线程阻塞的时间,到时间自动唤醒,没有自旋,也没有额外开销,比Semaphore开销小。
    • 从应对突发请求来看,Semaphore没有机制来应对,突发的大流量可能直接使等待链表变得超级大,而RateLimiter可以通过”令牌空闲累积“以及“预支”的策略来保证1秒的内的实际请求量可能临时大于限值,等桶中令牌消耗完了就会趋于稳定,稳定在我们限制的QPS中,这样在一定程度上保证了服务的可用性

关于并发数和QPS的思考

   这里先明确一个概念,在实际的业务场景中,QPS是稳定的,影响服务器性能的主要是并发数,试想一下如果我们服务的响应时间特别短,在某一个时刻我们预期的并发数会很低,不会对服务器造成很大的压力,符合逻辑;如果时延很高,这就表示我们的服务链出了问题,这样系统在某个时刻的并发数就会很高,因为时延高会导致某些线程迟迟不释放系统资源。这也是符合逻辑的。

  根据以上逻辑,这套QPS和并发量的转换公式就是:$QPS = 1000 / t C$。$t$表示服务的平均响应时延,$C$表示该服务在这个QPS下的最大并发量。在使用Semaphore中,我们将会进行单机的并发数限流,通过上面的公式,我们可以对并发数的控制有一个相应的预期,*在控制并发数的前提下,服务的稳定性会影响QPS(默认在限流生效的情况下),如果服务时延降低,QPS预期会提高,如果服务时延提高,对应的QPS就会下降;我建议对被限流的请求进行打点统计,因为在限制并发数的前提下,QPS的降低不一定是用户侧的问题,也可能是服务稳定性的问题,比如网络的波动也会导致QPS降低。

  举个简单的例子:某个服务的QPS有15W,平均请求时延为40ms,部署了600个实例。

  这就是说,我们平时对这个服务的QPS期望不超过15W,单机平均QPS为:$Q = 150000 / 600 = 250$; 此时如果用Semaphore限流,设置的最大并发数为: $C = 250 * 40 / 1000 = 10$。也就是说,如果服务的时延稳定在40ms,理论上最大设置10个并发就能满足15WQPS的需要。

  日常使用的时候,用RateLimiter可能更加直观,但是RateLimiter有一个问题:在满额QPS在跑的时候,如果服务可用性下降,请求时延陡增,系统的并发数也会陡增,导致严重的后果。因此,如果使用RateLimiter,建议对服务的请求时延做一个监控,当时延超过一定的阈值的时候,对服务降级处理,防止对服务器产生过大的压力。

参考文献

  1. Google Guava API DOCS
  2. Java8 API DOCS
  3. DRF框架 Throttling API DOCS