Promise 推荐实践 - 进阶篇:并发控制

相关文章:

上一篇的最后,我们列举了两个简单的逐个串行与并发执行的例子。不过日常实际开发中,我们还会遇到更复杂的场景——比如下载 300 张图片,上一篇中简单的写法就无法应对了。这次我们来说说如何更恰当地处理这类批量异步任务。

Meme, LOL

2023/12/28 - 更新,修复启动列表为空时 Promise.race() 无法 resolve 导致卡住的问题。

1. 批量异步任务

如果我们需要下载 300 张图片,该怎么处理呢?

(1) 同步编程思路

下载 300 张图片之前,我们应该会有它们的 URL,存在一个数组内。

按照传统的同步编程思路,我们首先想到的大概是这个样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const urlList = [/* 这是300张图片的 URL... */];

function downloadImage(url) {
/* 这是一个同步下载图片的函数 */
}

urlList.forEach((url) => {
console.log('开始下载:', url);
downloadImage(url);
console.log('下载完成:', url);
});

// 或者使用 for 循环
for (let i = 0, total = urlList.length; i < total; i += 1) {
const url = urlList[i];
console.log('开始下载:', url);
downloadImage(url);
console.log('下载完成:', url');
}

问题:瞬间高并发

而实际上,由于浏览器内 JavaScript 执行与 UI 是阻塞的单线程模式,下载图片这种耗时的任务必须作为异步任务来处理,以免下载完成前阻塞 UI 线程导致页面完全卡死,让用户无法继续交互操作。

那么调用 downloadImage() 后,浏览器将会启动一个异步的下载任务,而下载完成状态将在回调函数中异步触发(而非启动下载的下一句)。

所以在我们上面的循环中,执行 downloadImage() 启动下载后将会立刻执行下一次循环,马上启动下一张图片的下载——也就是说,上面的代码将会瞬间发出了 300 个下载图片的网络请求

呃,理论上说 300 张图片的下载任务之间没有前后依赖的逻辑关系,确实可以并发执行。

但是真的直接一把梭同时发起 300 个请求,先不论你的客户端性能是否可以 hold 住这样的瞬间高负载任务,就说服务器一下收到你这么多请求,也要把你当作恶意攻击服务器的黑客,直接给拉黑屏蔽了。不然每个客户端都这么来一出,服务器性能和带宽也要遭不住。

这么搞显然是不行的。

(2) 简单串行的问题

通过 Promise + async/await,我们可以将上面的写法稍作改动,用类似同步编程的思路,实现 300 张图片的逐个下载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const urlList = [/* 这是300张图片的 URL... */];

function downloadImage(url) {
/** 异步下载图片 */
return new Promise((rs) => {
const img = new Image();
img.onload = rs;
img.src = url;
});
}

async function batchDownload() {
for (let i = 0, total = urlList.length; i < total; i += 1) {
const url = urlList[i];
console.log('开始下载:', url);
await downloadImage(url);
console.log('下载完成:', url');
}
}

batchDownload().catch(ex => console.error(ex));

问题:不能直接使用 Array.forEach

这里我们使用的是 for 循环而不是 Array.forEach(),因为后者需要传入一个新的闭包函数来处理每个链接的异步任务,那这个闭包函数就需要使用 async 函数,那上面的函数就会变成:

1
2
3
4
5
6
7
async function batchDownload() {
urlList.forEach(async (url) => {
console.log('开始下载:', url);
await downloadImage(url);
console.log('下载完成:', url');
});
}

而上一篇我们就已经知道,async 函数其实就是 Promise 的语法糖,所以这段代码的实际效果其实相当于:

1
2
3
4
5
6
7
8
9
function batchDownload() {
urlList.forEach((url) => new Promise((rs) => {
console.log('开始下载:', url);
return downloadImage(url);
}).then(() => {
console.log('下载完成:', url');
});
return Promise.resolve();
}

有些同学可能应该已经看出问题了,简单来说就是:每个下载任务内都是类似同步顺序执行(先打印“开始下载”,下载完成后打印“下载完成”),但所有 Promise 都在瞬间同时被创建,所以整个下载任务仍然是瞬间发出了300个请求

问题:不推荐在 for 循环内 await

而上一个方案里,使用 for 的写法看起来比较简单便捷,虽然取数组长度、递增和获取成员的代码有点啰嗦,但也可以使用 for-of 语法来简化达到类似 Array.forEach() 的便捷程度(不过会导致更难获取当前处理的下标序号 index):

1
2
3
4
5
6
7
async function batchDownload() {
for (const url of urlList) {
console.log('开始下载:', url);
await downloadImage(url);
console.log('下载完成:', url');
}
}

但这样的写法也是有问题的,常见的 eslint 规则集(比如 eslint-config-airbnb)一般都会开启一条规则:no-await-in-loop

个人认为设置这个限制的大致原因可能有两个:

  1. 无法利用异步并发能力,导致代码效率低下
  2. for-of 循环过程中,对原数组的成员增减操作将会影响循环的执行。项目规模较大时,某些意外流程可能因此使循环无法如预期结束而导致失控

因此,我们的理想处理方案应该是:

  1. 提供类似 Array.forEach() 的便捷语法;
  2. 可以控制多个任务并发执行,提高效率。

2. 探索解决方案

(1) 模拟异步任务

为了让异步任务的效果更直观可见,我们先创建一个页面模拟异步加载的效果:

Demo 01

以下是页面代码:

1
2
3
4
5
6
<!-- index.html -->
<!DOCTYPE html>

<body></body>

<script type="module" src="./index.js"></script>
1
2
3
4
5
6
7
8
9
10
11
12
13
// index.js
import { createTask, pushMsg } from './task-list.js';

async function main() {
const task = createTask({
speed: 5,
});
pushMsg('任务开始');
await task.run();
pushMsg('任务结束');
}

main().catch(ex => console.error(ex));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// task-list.js
const style = document.createElement('style');
style.innerHTML = /* css */`
.task-item {
margin: .5em 0;
max-width: 400px;
height: 1em;
border-radius: 0.2em;
border: 1px solid rgba(33,33,33,.25);
background: white;
overflow: hidden;
}

.task-progress {
width: 0%;
height: 100%;
background: green;
}

.msg-item {
margin: 0;
font-size: 12px;
line-height: 1.4;
}
`;
document.body.appendChild(style);

const list = document.createElement('div');
document.body.appendChild(list);

const msgs = document.createElement('div');
document.body.appendChild(msgs);

export function createTask({ speed = 1, failProb = 0 }) {
const item = document.createElement('div');
item.className = 'task-item';
const progress = document.createElement('div');
progress.className = 'task-progress';
item.appendChild(progress);
list.appendChild(item);

let current = 0;
const total = 100;
const interval = 100;

/**
* @param {() => void} callback
* @param {() => void} fail
*/
const tick = (callback, fail) => {
setTimeout(() => {
current = Math.min(current + speed, total);
progress.style.width = `${(current * 100) / total}%`;
if (Math.random() < failProb) {
progress.style.background = 'red';
fail();
return;
}
if (current === total) {
callback();
} else {
tick(callback, fail);
}
}, interval);
};

return {
/** @returns {Promise<void>} */
run: () => new Promise((rs, rj) => {
tick(rs, rj);
}),
};
}

export const pushMsg = (content = '') => {
const msg = document.createElement('p');
msg.className = 'msg-item';
msg.textContent = content;

const { children: msgItems } = msgs;
if (msgItems.length > 0) {
const first = msgItems[0];
msgs.insertBefore(msg, first);
} else {
msgs.appendChild(msg);
}
};

(2) 通过 reduce 串行执行

如果不考虑并发的话,其实上面 Array.forEach() 方案的问题改一改就能解决问题了,突破口就是 Array.reduce()

既然之前每个数组成员的迭代结果都会变成 new Promise(),而 Array.reduce() 可以将前一次迭代的结果传给下一次迭代。

那我们如果将它们结合一下,在每次迭代开始时先 await 前一次迭代的 Promise 完成,以此类推不是就能完成每个任务之间逐个等待完成,直到最终任务完成了?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @function
* @template T
* @param {T[]} list
* @param {(item: T, index: number) => Promise<void>} callback
*/
export const iteratePromise = async (list, callback) => {
await list.reduce(async (prevPromise, cur, index) => {
// 每次开始时,先等待前一次迭代完成
await prevPromise;
// 等待本次迭代完成
await callback(cur, index);
}, Promise.resolve());
};

因为返回值是 await Promise 的 async 函数,可以省略最终的 await,所以还可以稍作简化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// index.js
import { createTask, pushMsg } from './task-list.js';

/**
* @function
* @template T
* @param {T[]} list
* @param {(item: T, index: number) => Promise<void>} callback
* @returns {Promise<void>}
*/
export const iteratePromise = (list, callback) =>
list.reduce(async (prevPromise, cur, index) => {
// 每次开始时,先等待前一次迭代完成
await prevPromise;
// 等待本次迭代完成
await callback(cur, index);
}, Promise.resolve());

async function main() {
const taskList = Array.from({ length: 3 }, () => createTask({
speed: 10,
}));
await iteratePromise(taskList, async (task, i) => {
pushMsg(`任务 ${i} 开始`);
await task.run();
pushMsg(`任务 ${i} 结束`);
});
pushMsg('所有任务完成');
}

main().catch(ex => console.error(ex));

可以看到多个异步任务逐个进行,效果符合预期:

Demo 02

改进版:保留前一次迭代结果,返回最终结果

我们还可以对这个 iteratePromise() 做个小改动,实现类似 Array.reduce() 基于前一次迭代结果继续计算的效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* @function
* @template T
* @param {T[]} list
* @param {(prevValue: T | undefined, item: T, index: number) => Promise<T>} callback
* @param {T} [startValue]
*/
export const reducePromise = async (list, callback, startValue) => {
let prevValue = startValue;
await list.reduce(async (prevPromise, cur, index) => {
await prevPromise;
prevValue = await callback(prevValue, cur, index);
}, Promise.resolve());
return prevValue;
};

这个版本可以用于实现 对多个异步任务进行合计运算、基于前一个任务的结果调整后一个任务的运算策略 等效果。

(3) 允许指定数量的任务并发

上面的方案达到了异步任务批量串行执行的基本诉求,接下来我们就要考虑如何控制同一时间内允许指定数量的异步任务并行执行。

最简单粗暴的思路就是直接对上面的任务数组进行均匀切分,假如我们允许同时3个任务并发执行,那么:

  • 每3个成员放入一个子数组,作为 任务组,最终将整个任务数组转换为一个二维的任务组数组;
  • 再逐个对任务组内的任务进行 Promise.all() 并行执行;
  • 每个任务组之间就用上面的 iteratePromise() 串行执行。

这个版本实际效果并不理想,这边就不实现了。

问题在于每个任务组内部分任务完成时,并不能马上开始下一组任务,下一组任务仍然需要等待前一组任务的所有任务完成后才能开始,策略过于僵硬。

所以,实际上每组任务都会存在一段部分任务完成后等待组内最慢任务的“偷懒”时间,而不是我们理想状态下每时每刻都有3个任务在跑的效果。

(4) 通过 race 做并发控制

基本思路

为了确保每一时刻尽量跑满我们所预期的并发数量,就需要视情况随时调整进行中的任务。这个动态调控的运行任务列表,我们暂且称之为 任务池

在每个任务完成时,我们从任务池里剔除已完成的任务,加入等待中的任务,已维持全程并发数量都达到我们的预设数量(除非剩余任务数已经不足)。

这里我们使用 Promise.race() 来处理任务池,就可以在其中任一任务结束时进行响应处理,基本思路如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/** 并发数量限制 */
const concurrent = 3;
/** 任务池 */
const pool = [];
/** 等待队列 */
const waiting = [];

function runPool(onComplete) {
/* TODO:
将等待队列装填至 pool,直到 pool
达到 concurrent 限制或 waiting 队列已空
*/

Promise.race(pool).then(() => {
/* TODO:
将完成的任务从 pool 中移除。
然后判断 pool 和 waiting 是否还有任务,
若皆为空则 isComplete = true
*/

if (isComplete) onComplete();
else runPool(onComplete);
});
}

顺带一提,关于 Promise.all()Promise.race()Promise.any() 三者的异同:

  • 它们都接收一个返回 Promise 的可迭代对象(如 Promise 数组),返回一个包装后的新 Promise;
  • Promise.all() 返回的新 Promise 将在传入的所有成员全部被 resolve 时才会被 resolve,在任一成员被 reject 时立刻 reject(浏览器兼容性 Chrome >= 32);
  • Promise.any() 则是在所有成员全部被 reject 时才会被 reject,在任一成员被 resolve 时立刻 resolve(浏览器兼容性 chrome >= 85);
  • Promise.race() 在任一成员被 resolve 或 reject 时,立刻返回相同结果(浏览器兼容性 chrome >= 32);
  • 简而言之,all()any() 的策略类似于 成员结果间 &&|| 运算作为最终结果,并且带短路运算的特性。而 race() 则是 竞赛机制,只看第一个结束的成员结果

实现算法

为了方便搜寻和增减任务,我们把任务池 pool 的类型由数组改为 Record 键值对,然后再每个任务完成时返回自己的 key。

完成上面的 TODO 之后,我们的算法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* @function
* @template T
* @param {T[]} list
* @param {(item: T, index: number) => Promise<void>} callback
*/
export const eachPromise = (list, callback) => {
let concurrent = 1;
const remain = [...list];
/** @type {Record<string, Promise<string>>} */
const pool = {};

/** @param {() => void} onComplete */
const runPool = (onComplete) => {
// 计算任务序号、待填充数量
const offset = list.length - remain.length;
const sliceSize = concurrent - Object.values(pool).length;
// 切片后填充任务进池子
const slice = remain.splice(0, sliceSize);
slice.forEach((item, i) => {
const index = i + offset;
const key = String(index);
// 任务加入 pool,完成后返回自己的 key
pool[key] = callback(item, index)
.then(() => key);
});
// 判断是否已空
const poolTasks = Object.values(pool);
const isPoolEmpty = poolTasks.length === 0;
const isComplete = isPoolEmpty && remain.length === 0;
if (isComplete) {
onComplete();
return;
}
// 池内任务一起 race
Promise.race(poolTasks)
.then((key) => {
// 任一任务结束后,从池内删除,然后继续下一批
delete pool[key];
runPool(onComplete);
});
};
return {
/**
* @param {{ concurrent?: number}} [options]
* @returns {Promise<void>}
*/
run: (options = {}) => new Promise((rs) => {
concurrent = options.concurrent ?? concurrent;
// 启动
runPool(rs);
}),
};
};

使用上面的 eachPromise() 修改我们之前的测试页:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// index.js

async function main() {
const taskList = Array.from({ length: 5 }, () => createTask({
speed: Math.random() * 4 + 3,
}));
await eachPromise(taskList, async (task, i) => {
pushMsg(`任务 ${i} 开始`);
await task.run();
pushMsg(`任务 ${i} 结束`);
}).run({ concurrent: 3 });
pushMsg('所有任务完成');
}

main().catch(ex => console.error(ex));

看看效果:

Demo 03

OK,至此我们的目标终于基本达成。

3. 改进与完善

(1) 异常处理

上面的 eachPromise() 暂时只是最基本的核心算法,对于各种特殊情况都还没有做考虑:

  • 如果某个成员任务执行异常,被 reject 怎么处理;
  • 如果已经启动的处理后,又被意外再次调用 run() 怎么办。

……等等各种情况,都是我们需要考虑并处理的。

毕竟异步任务提高效率的代价,就是让编码更复杂,你需要考虑各种情况做好处理。

正如我们调侃多线程那句老话:“你有2个问题需要处理,通过使用多线程后,你在现个问6题了有。”

Meme, Multithreading

不过上面的两个小问题还不足为虑,前者只要在每个任务启动时的 .then() 中,增加 onRejected 情况的处理,也返回任务 key 就行了:

1
2
3
4
5
6
7
// ...

// 任务加入 pool,完成后返回自己的 key
pool[key] = callback(item, index)
.then(() => key, () => key);

// ...

后者的话,可以在每次 race() 时保留当前 Promise 作为依据,判断任务已经执行时,就不再启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
export const eachPromise = (list, callback) => {
// ...

/** @type {null | Promise<void>} */
let running = null;

/** @param {() => void} onComplete */
const runPool = (onComplete) => {
// ...
running = Promise.race(Object.values(pool))
.then((key) => {
// ...
});
};
return {
/**
* @param {{ concurrent?: number}} [options]
* @returns {Promise<boolean[]>}
*/
run: (options = {}) => new Promise((rs) => {
if (running !== null) return;
// ...
}),
};
};

(2) 各任务执行结果

经过上一步我们的修改后,虽然有成员任务失败后不再影响整个任务池的运作,但是在所有任务结束后,我们没法直到哪些任务被成功完成、哪一些失败了。

所以我们还可以再对于每次任务的执行结果进行记录,最后在结束所有任务后,像 Promise.all() 一样将执行结果以数组的形式返回。

最终我们相对完善的代码版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* @function
* @template T
* @param {T[]} list
* @param {(item: T, index: number) => Promise<void>} callback
*/
export const eachPromise = (list, callback) => {
let concurrent = 1;
const remain = [...list];
/** @type {Record<string, Promise<string>>} */
const pool = {};
/** @type {boolean[]} */
const result = [];
const setResult = (index = 0, res = false) => {
result[index] = res;
};

/** @type {null | Promise<void>} */
let running = null;

/** @param {() => void} onComplete */
const runPool = (onComplete) => {
const offset = list.length - remain.length;
const sliceSize = concurrent - Object.values(pool).length;
const slice = remain.splice(0, sliceSize);
slice.forEach((item, i) => {
const index = i + offset;
const key = String(index);
pool[key] = callback(item, index)
.then(() => setResult(index, true), () => setResult(index, false))
.then(() => key);
});
// 判断是否已空
const poolTasks = Object.values(pool);
const isPoolEmpty = poolTasks.length === 0;
const isComplete = isPoolEmpty && remain.length === 0;
if (isComplete) {
onComplete();
return;
}
// 池内任务一起 race
running = Promise.race(poolTasks)
.then((key) => {
// 任一任务结束后,从池内删除,然后继续下一批
delete pool[key];
runPool(onComplete);
});
};
return {
/**
* @param {{ concurrent?: number}} [options]
* @returns {Promise<boolean[]>}
*/
run: (options = {}) => new Promise((rs) => {
if (running !== null) return;
concurrent = options.concurrent ?? concurrent;
runPool(() => {
running = null;
rs(result);
});
}),
};
};

调整之前的测试代码,模拟一个任务出错概率,并输出最后的任务执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// index.js
async function main() {
const taskList = Array.from({ length: 5 }, () => createTask({
speed: Math.random() * 4 + 3,
failProb: 0.02,
}));
const result = await eachPromise(taskList, async (task, i) => {
pushMsg(`任务 ${i} 开始`);
await task.run();
pushMsg(`任务 ${i} 结束`);
}).run({ concurrent: 3 });
pushMsg('所有任务完成');
pushMsg(`- 各任务完成状态: ${JSON.stringify(result)}`);
}

检查效果:

Demo 04

OK。

(3) TypeScript 版本

本文上面的代码对需要提示的数据类型进行了 JsDoc 标注,日常使用时,只要开启项目 tsconfig/jsconfigcheckJs: true 配置,就已经可以得到比较完善的 IDE 智能提示和纠错了。

不过如果想直接把上面的函数加入到 TypeScript 项目,那还是再稍微改写一下,提供 TypeScript 的版本吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
export const iteratePromise = async <T>(
list: T[],
callback: (item: T, index: number) => Promise<void>,
) => {
await list.reduce(async (prevPromise, cur, index) => {
await prevPromise;
await callback(cur, index);
}, Promise.resolve());
};

export const reducePromise = async <T>(
list: T[],
callback: (prevValue: T | undefined, item: T, index: number) => Promise<T>,
startValue?: T,
) => {
let prevValue = startValue;
await list.reduce(async (prevPromise, cur, index) => {
await prevPromise;
prevValue = await callback(prevValue, cur, index);
}, Promise.resolve());
return prevValue;
};

export const eachPromise = <T>(
list: T[],
callback: (item: T, index: number) => Promise<void>,
) => {
let concurrent = 1;
const remain = [...list];

let running: null | Promise<void> = null;
const pool: Record<string, Promise<string>> = {};
const result: boolean[] = [];
const setResult = (index: number, res: boolean) => {
result[index] = res;
};

const runPool = (onComplete: () => void) => {
const offset = list.length - remain.length;
const sliceSize = concurrent - Object.values(pool).length;
const slice = remain.splice(0, sliceSize);
slice.forEach((item, i) => {
const index = i + offset;
const key = String(index);
pool[key] = callback(item, index)
.then(() => setResult(index, true), () => setResult(index, false))
.then(() => key);
});
// 判断是否已空
const poolTasks = Object.values(pool);
const isPoolEmpty = poolTasks.length === 0;
const isComplete = isPoolEmpty && remain.length === 0;
if (isComplete) {
onComplete();
return;
}
// 池内任务一起 race
running = Promise.race(poolTasks)
.then((key) => {
// 任一任务结束后,从池内删除,然后继续下一批
delete pool[key];
runPool(onComplete);
});
};
return {
run: (options: {
concurrent?: number,
} = {}) => new Promise<boolean[]>((rs) => {
if (running !== null) return;
concurrent = options.concurrent ?? concurrent;
runPool(() => {
running = null;
rs(result);
});
}),
};
};

结语

以上就是我关于使用 Promise 进行批量异步任务的并发控制处理的一点小思考和心得,希望能对大家有帮助或启发~

欢迎关注博客地址: https://blog.krimeshu.com/