在 ZeroMQ 的 Response/Request 模式中,Nodejs 服务端回调函数逻辑内调用 fs.readFile 时,为何会阻塞执行?(内有代码)

发布于 1周前 作者 sinazl 来自 nodejs/Nestjs

在 ZeroMQ 的 Response/Request 模式中,Nodejs 服务端回调函数逻辑内调用 fs.readFile 时,为何会阻塞执行?(内有代码)

Node 版本 10.16.0 ZeroMQ 版本 5.1.0 使用 ZeroMQ 的代码如下:

服务端 /zmq-rep.js

"use strict";
const fs = require("fs");
const zmq = require("zeromq");
const moment = require("moment");
const responder = zmq.socket("rep");

responder.on(“message”, data => { let reqData = JSON.parse(data); let filename = reqData.path; console.log(Request received:${filename}); fs.readFile(filename, (err, data) => { if (err) { throw Error(err); } console.log(Sending response for ${filename} at ${moment().format()}); responder.send( JSON.stringify({ content: data.toString(), timestamp: moment().format(“x”), pid: process.pid }) ); }); });

responder.bind(“tcp://127.0.0.1:50221”, err => { if (err) { throw Error(err); } console.log(“Now listening for requests…”); });

process.on(“SIGINT”, () => { console.log(“Now closing server…”); responder.close(); });

客户端 /zmq-req.js

"use strict";
const zmq = require("zeromq");
const moment = require("moment");
const filename = process.argv[2];
const request = zmq.socket("req");
request.on("message", data => {
  let response = JSON.parse(data);
  console.log(
    `Fetch file content:${response.content} at ${
      response.timestamp
    } processed by ${response.pid}`
  );
});
request.connect("tcp://127.0.0.1:50221");

for (let i = 1; i <= 5; i++) { console.log( Sending request to get ${filename} for ${i} time(s) at ${moment().format()} ); request.send( JSON.stringify({ path: filename }) ); }

使用如下方式调用:

# Terminal 1
$ node zmq-rep.js
# Terminal 2
$ node zmq-req.js target.txt

服务端输出如下:

Now listening for requests...
Request received:target.txt
Sending response for target.txt at 2019-07-27T07:36:47+08:00
Request received:target.txt
Sending response for target.txt at 2019-07-27T07:36:47+08:00
Request received:target.txt
Sending response for target.txt at 2019-07-27T07:36:47+08:00
Request received:target.txt
Sending response for target.txt at 2019-07-27T07:36:47+08:00
Request received:target.txt
Sending response for target.txt at 2019-07-27T07:36:47+08:00

客户端输出如下

Sending request to get target.txt for 1 time(s) at 2019-07-27T07:36:47+08:00
Sending request to get target.txt for 2 time(s) at 2019-07-27T07:36:47+08:00
Sending request to get target.txt for 3 time(s) at 2019-07-27T07:36:47+08:00  
Sending request to get target.txt for 4 time(s) at 2019-07-27T07:36:47+08:00
Sending request to get target.txt for 5 time(s) at 2019-07-27T07:36:47+08:00  
Fetch file content:50221 at 1564184207708 processed by 12984
Fetch file content:50221 at 1564184207734 processed by 12984
Fetch file content:50221 at 1564184207747 processed by 12984
Fetch file content:50221 at 1564184207766 processed by 12984
Fetch file content:50221 at 1564184207794 processed by 12984

可以看到,客户端代码发送请求是非阻塞的,但服务端代码是阻塞的。《 Node.js 8 the right way 》对这个现象的解释是:"Node.js event loop was left spinning while the fs.readFile for each request was being processed."但查阅文档,fs.readFile 本身是非阻塞的,如果这一解释成立的话,fs.readFile 不就是阻塞函数了吗?

我另外写了一份只使用 net 模块进行通讯的代码,现象与使用 ZeroMQ 进行通信不同,在这种情况下,服务端的回调并没有阻塞。

服务端 /tcp-server.js

"use strict";
const fs = require("fs");
const net = require("net");
const moment = require("moment");

const server = net .createServer(connection => { console.log(“Request received”); connection.on(“data”, data => { let reqData = JSON.parse(data); console.log(Reading ${reqData.path} at ${moment().format()}); fs.readFile(reqData.path, (err, data) => { connection.write( JSON.stringify({ content: data.toString(), timestamp: moment().format() }) ); }); }); }) .listen({ port: 50221, hostname: “127.0.0.1” }) .on(“listening”, () => { console.log(“Now listening…”); });

客户端 /tcp-client.js

"use strict";
const net = require("net");
const filename = process.argv[2];
const connectionPool = [];

for (let i = 1; i <= 5; i++) { connectionPool.push( net .createConnection(50221, “127.0.0.1”, () => { console.log( Connetcion ${i} established and start fetching file ${filename} on server... ); }) .on(“data”, data => { console.log(Receive response:${data}); }) ); } connectionPool.forEach(connection => { connection.write( JSON.stringify({ path: filename }) ); });

process.on(“SIGINT”, () => { connectionPool.forEach(connection => { connection.end(); }); });

调用如下:

# Terminal 1
$ node tcp-server.js
# Terminal 2
$ node tcp-client.js target.txt

客户端输出如下:

Connetcion 1 established and start fetching file target.txt on server...
Connetcion 2 established and start fetching file target.txt on server...
Connetcion 3 established and start fetching file target.txt on server...      
Connetcion 4 established and start fetching file target.txt on server...      
Connetcion 5 established and start fetching file target.txt on server...      
Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"}
Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"}
Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"}  
Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"}  
Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"}

服务端输出如下:

Now listening...
Request received
Request received
Request received
Request received
Request received
Reading target.txt at 2019-07-27T07:44:53+08:00
Reading target.txt at 2019-07-27T07:44:53+08:00
Reading target.txt at 2019-07-27T07:44:53+08:00
Reading target.txt at 2019-07-27T07:44:53+08:00
Reading target.txt at 2019-07-27T07:44:53+08:00

于是可以看到,不论在服务端还是客户端的 io 都是非阻塞的。

这种不同是否与 ZeroMQ 的实现有关呢?感谢大佬们的回复!


6 回复

zeromq 的 req/rep 模式,server 只能收到 req,然后 rep,再收 req,再 rep。如果要异步,建议用 dealer/router 模式


#1 非常感谢,这和 ZeroMQ response 模式服务端回调的执行方式有关吗?按我的理解在 node 中执行回调不应该都是异步非阻塞的吗?或者说 ZeroMQ 在一个 request 的回调完成之前不会读下一个 request ?

zeromq 有好几种 message model,你说的 request/reply 模式是同步的,就是 server 必须进行一次 reply 之后才能再次接收 client 请求。看下 zeromq 的文档吧。

http://zguide.zeromq.org/page:all#The-Request-Reply-Mechanisms

最近也在看 zmq,req-rep 会阻塞。想不阻塞可以考虑 Push-Pull 模式吧,可能要开两组端口。

可以考虑使用 grpc 之类的呢

在 ZeroMQ 的 Response/Request 模式中,Node.js 服务端如果在回调函数逻辑内调用 fs.readFile 可能会阻塞执行,主要是因为 fs.readFile 是一个同步文件读取操作。尽管 Node.js 是基于事件驱动的异步 I/O 模型,其文件系统模块 fs 提供了同步和异步两种方法。使用同步方法(如 fs.readFile)时,会阻塞事件循环,直到文件读取完成。

为了解决这个问题,应该使用异步的文件读取方法 fs.readFile 的异步版本,即带有回调函数的版本,或者使用 fs.promises.readFile(基于 Promise)。这样可以避免阻塞事件循环,保持 Node.js 的非阻塞特性。

下面是使用异步 fs.readFile 的示例代码:

const zmq = require('zmq');
const fs = require('fs');

const responder = zmq.socket('rep');
responder.bind('tcp://*:5555', function (err) {
  if (err) {
    console.log(err);
  } else {
    console.log('Server listening on port 5555');
  }
});

responder.on('message', function (request) {
  fs.readFile('example.txt', 'utf8', function (err, data) {
    if (err) {
      responder.send('Error reading file');
    } else {
      responder.send(data);
    }
  });
});

在这个例子中,fs.readFile 的第二个参数是编码格式,第三个参数是回调函数。回调函数会在文件读取完成后被调用,从而避免阻塞事件循环。使用这种方法可以确保 ZeroMQ 的 Response/Request 模式保持高效和响应迅速。

回到顶部