JS如何做请求并发限制
在前端领域里,最常用的并发手段就是异步,即不因为资源的消耗而阻塞程序的执行。从逻辑上讲,异步并不是为了并发,而是为了不阻塞主线程。但是我们却可以同时发起多个异步操作,来起到并发的效果,虽然计算的过程是同步的。对于我们web服务而言,使用 async/await 的确很舒服,但是适当的合并请求,使用 Promise.all 才能提高性能。
# 限制并发
一旦你习惯了 Promise.all,同时了解了 EventLoop 的机制,你会发现 I/O 请求的限制往往在下游。因为对于 服务端、node端 来说,同时发送 10 个 RPC 请求和同时发送 100 个 RPC 请求的成本差别并不大,都是“发送-等待”的节奏,但是下游的“供应商”是会受不了的,这时你需要限制并发数。
# 限制并发数
我们可以通过p-limit (opens new window)包的源码来了解如何限制前端异步并发数
首先,p-limit依赖了yocto-queue (opens new window)包实现了一个先进先出的链表来做异步函数队列的存储。代码很简单,我们可以直接先看其源码以便后续的了解:
class Node {
value;
next;
constructor(value) {
this.value = value;
}
}
export default class Queue {
#head;
#tail;
#size;
constructor() {
this.clear();
}
enqueue(value) {
const node = new Node(value);
if (this.#head) {
this.#tail.next = node;
this.#tail = node;
} else {
this.#head = node;
this.#tail = node;
}
this.#size++;
}
dequeue() {
const current = this.#head;
if (!current) {
return;
}
this.#head = this.#head.next;
this.#size--;
return current.value;
}
clear() {
this.#head = undefined;
this.#tail = undefined;
this.#size = 0;
}
get size() {
return this.#size;
}
* [Symbol.iterator]() {
let current = this.#head;
while (current) {
yield current.value;
current = current.next;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
代码很简单,唯一需要注意的只有 Symbol.iterator 的迭代器。Symbol.iterator (opens new window) 为每一个对象定义了默认的迭代器。该迭代器可以被 for...of 循环使用。它赋予了Queue类循环遍历的能力。
我们先来看看p-limit的大致用法:
const fns = [
fetchSomething1,
fetchSomething2,
fetchSomething3,
];
const limit = pLimit(10);
Promise.all(
fns
.map(fn =>
limit(async () => {
await fn() // fetch1/2/3
})
) // map
); // Promise.all
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
接下来我们看看其具体的实现(代码略有精简,记录文章时自己重新实现了一遍):
import Queue from 'yocto-queue';
const pLimit = maxConcurrency => {
// ...
let activeCount = 0;
const queue = new Queue()
// ...
const enqueue = (fn, resolve, ...args) => {
queue.enqueue(run.bind(undefined, fn, resolve, ...args))
(async () => {
await Promise.resolve()
if (activeCount < maxConcurrency && queue.size>0) {
queue.dequeue()()
}
})()
/**
* 为什么需要async await ?
* 因为“activeCount”是异步更新的
* 此函数需要等到下一个微任务后再进行比较.
* 当run函数退出队列并被调用时。if语句中的比较也需要异步进行,以获取“activeCount”的最新值。
* 这里也是内部异步函数执行的开始
*/
}
const generator = (fn, ...args) => {
return new Promise((resolve) => enqueue(fn, resolve, ...args))
}
// ...
return generator
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
- pLimit初始化时传入了maxConcurrency(最大并发数)参数,初始化了activeCount(当前正在执行的异步任务数)和queue(队列),并返回了一个generator函数, 多个 generator 函数会共用一个队列,其内部返回了一个promise函数。
- generator函数执行后,将用run函数封装的请求添加进queue队列,并调用了一个自执行函数,这个函数发现 activeCount 小于最大并发数时,则调用 dequeue 弹出一个函数,并执行它。
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
const run = async (fn, resolve, args) => {
activeCount++;
// 这里将fn通过async进行包裹后是为了获得一个返回的Promise函数,以满足异步/Promise.all的要求
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- run函数做了三件事: 1.让activeCount数加一。 2.执行异步函数 fn,并将结果传递给 resolve, 为保证 next 的顺序,采用了 await result. 3.调用next函数
- next函数在得知上一个异步函数执行之后,将activeCount数减一,并从队列中拉出一个新的异步函数进行调用。
通过函数 enqueue、run 和 next,plimit 就产生了一个限制大小但不断消耗的异步函数队列,从而起到限流的作用。
编辑 (opens new window)
上次更新: 2022/12/11, 20:19:48