你有没有试过用纸杯接消防栓的水?
这就是写JavaScript时数据涌入速度远超处理能力的真实感受。可能是海量Webhook轰炸,可能是巨型文件读取,也可能是高频WebSocket推送。发送端比接收端快,系统就会崩。
你需要一个等候室——让数据先待着,等应用准备好再处理。计算机科学里叫"队列"或"通道"。今天我们从零手写一个,就叫它"邮箱"。
想象真实的物理邮箱。邮递员只管投信,不在乎你在不在家;你去查看,有信就拿,没信就等。我们用Promise在JavaScript里复刻关系。
核心代码如下:
export class Mailbox {
constructor() {
messages = []
waiters = []
closed = false
}
push(msg) {
if (closed) throw new Error("Closed")
if (waiters.length > 0) {
const resolve = waiters.shift()
resolve(msg)
return
}
messages.push(msg)
}
async pop() {
if (messages.length > 0) return messages.shift()
if (closed) return null
return new Promise((resolve) => {
waiters.push(resolve)
})
}
close() {
closed = true
while (waiters.length > 0) {
const resolve = waiters.shift()
resolve(null)
}
}
get size() { return messages.length }
async *[Symbol.asyncIterator]() {
while (true) {
const msg = await pop()
if (msg === null) break
yield msg
}
}
}
最精妙的部分在pop()方法里。通常Promise用于fetch()场景:发起请求,等待响应。但这里我们玩了个花招——邮箱为空时,新建Promise,把resolve函数塞进waiters数组。相当于把"完成Promise的能力"封存起来。
代码就此暂停,原地等待。直到push()被调用,从waiters里取出那个封存的resolve,用新消息触发它。暂停的代码瞬间苏醒,拿到数据。
因为底部那个看似奇怪的[Symbol.asyncIterator],使用方式异常简洁:
const box = new Mailbox();
// 消费者:等待邮件
async function receive() {
for await (const msg of box) {
console.log("收到:", msg);
}
}
// 生产者:投递邮件
box.push("第一封信");
box.push("第二封信");
box.close(); // 结束
receive();
for await...of循环会自动处理所有异步等待,close()触发后循环优雅退出。没有回调地狱,没有事件监听器堆积,代码读起来像同步逻辑。
模式解耦了生产者和消费者。两边不需要知道对方存在,不需要协商节奏,Mailbox在中间做缓冲。快的一方不会压垮慢的一方,数据不会丢失。
实际场景里,可以用它做限流:WebSocket消息先进邮箱,消费者按自己节奏处理。也可以做背压控制:文件读取流太快时,让读取端等消费端追上。还能解耦复杂流程,比如用户上传图片后,缩略图生成、元数据提取、通知发送各自作为独立消费者订阅邮箱。
关键优势在于"零依赖"。不需要RxJS,不需要Node.js的stream模块,纯语言特性实现。理解原理后,可根据业务定制:加优先级队列、持久化存储、甚至分布式扩展。
异步编程最难的是控制节奏。邮箱模式把"何时处理"的决定权交给消费者,生产者可无脑投递。这种反转,正是应对洪水的堤坝。
热门跟贴