异步 generator 函数是 ES2018 中新增的特性。Node.js 从 v10 版本增加了对异步 generator 函数的支持。异步 generator 函数看似一个相当小众特性特性,但是却为 node.js websocket 框架提供了一个灵巧的使用机会。
在这篇文章中,我将说明 Node.js websocket 框架将如何使用异步 generator 函数。
HTTP 框架分类
首先,想一下 Express 或 Hapi 之类的 HTTP 服务器框架。一般来说,大多数 HTTP 服务器框架都属于以下三种之一:
1. 显式响应。 在 Express 中发送一个 HTTP 响应,你必须调用 res.end(),res.json() 或者 res 对象上的一些其他方法。换句话说,你必须显式调用一个方法来发送一个响应。
2. 使用 return 隐式响应。 另一方面,Hapi 在 v17 中明确地删除了 reply() 函数,也就是说 Hapi 没有等同于 res 的方式。如果需要发送一个响应。你只需在请求的处理方法中 return 一个返回值。之后 Hapi 就会将 return 的值封装进一个 HTTP 响应中。
3. 在适当的位置修改响应。 Koa 使用了一种混合了以上两种实现的独特处理方式。你将以修改 ctx 对象的方式,替代调用 res 对象的方法来构建响应。
换句话说,一些 HTTP 框架要求你显式调用方法来发送 HTTP 响应,另一些框架会提供给你一个可更改的 HTTP 响应对象,还有一些框架仅需要处理函数中 return 一个值。
Websockets 和 HTTP 的区别在于,Websockets 服务器可以在任何时间向 socket 推送消息,不管是不是基于某条消息的响应。也就是说,初级的 websocket 框架,例如 ws, 看起来很像 “显式响应” 模式:你需要显式调用一个方法用于发送一条消息。
然而,是否可以在保持允许消息多发这个优点的同时,使 websockets 可以实现隐式响应?这就是异步 generator 产生的原因。
从服务器上读取大块数据
假设你有一个一次读取一堆文档的 Mongoose 指针,并且你希望用 websocket 在每一个文档读出时尽快将它发送出去。这种方式有助于在任何时刻都使服务器的内存使用量保持在最小:客户端可以获取所有的数据,而服务器却不用为此在内存中一次保存所有的数据。举个例子,这是使用 async/await 方式读取一个指针的实现:
const User = mongoose.model('User', mongoose.Schema({ name: String }));
const cursor = Model.find().cursor();
for await (const doc of cursor) {
console.log(doc.name); // Print user names 1 by 1.
使 generator 函数变得有趣的地方在于,在一个函数中 yield 方法可以被调用多次,并且在上次停止的地方继续运行,除了这点以外,yield 方法和 return 方法类似。
const User = mongoose.model('User', mongoose.Schema({ name: String }));
async function* streamUsers() {
const cursor = Model.find().cursor();
for await (const doc of cursor) {
// Yielding each doc behaves like multiple implicit responses, if you have
// a framework that supports it.
yield doc;
以下是如何使用 Node.js 编写一个 Websocket 服务器:
const WebSocket = require('ws');
const server = new WebSocket.Server({
port: 8080
server.on('connection', function(socket) {
socket.on('message', function(msg) {
// Handle message
至此,接下来要做的是为 websocket 服务器添加 streamUsers() 方法。假设收到的每条消息都是有效的 JSON,并且都有属性 action 和 id。当 action === 'streamUsers'时,streamUsers() 就会被执行,并且基于 socket 向外发送每个被 Mongoose cursor 查询出来的用户。
const WebSocket = require('ws');
const server = new WebSocket.Server({
port: 8080
server.on('connection', function(socket) {
socket.on('message', function(msg) {
msg = JSON.parse(msg);
if (msg.action === 'streamUsers') {
void async function() {
// Send 1 message per user, as opposed to loading all users and then
// sending them all in 1 message.
for await (const doc of streamUsers()) {
socket.send(JSON.stringify({ id: msg.id, doc }));
}().catch(err => socket.send(JSON.stringify({ id: msg.id, error: err.message })));
以下是如何通过 websocket 客户端调用 streamUsers() 方法:
const client = new WebSocket('ws://localhost:8080');
// Will print each user doc 1 at a time.
client.on('message', msg => console.log(msg));
await new Promise(resolve => client.once('open', resolve));
client.send(JSON.stringify({ action: 'streamUsers', id: 1 }));
后续
异步 generator 函数提供了一种创建更高级的,如同一些 HTTP 框架(例如 Hapi 和 Fastify)那样,基于隐式响应的 websocket 框架的机会。而隐式响应的主要优势就在于,你在业务逻辑中不需要关注框架是通过 websocket,HTTP 轮询或是其他某种方式来发送结果。框架自由式 Javascript 编程更轻便并且更容易测试。
通过将所有产生的值存放在一个数组中,或者让客户端发起多次请求对一个指针进行迭代,streamUsers() 方法就可以很容易的在一个 HTTP 框架,或者是一个使用轮询的 HTTP 框架中重用。没有异步 generator 函数,所有这些都是不能实现的。
热门跟贴