Nodejs 怎么实现并行化执行传输不可序列化对象?
现在的需求是存在一个类内的方法,原来是串行的,现在需要改并行。现在我需要将一个对象 obj ,或者对象的方法 obj.run 传入子线程,然后回调执行。
但是我尝试了几种方式,似乎是没办法将复杂的对象进行传递?以至于常规的回调函数的方式没办法在 node 的并行化中实现。
- worker_thread
new Worker(moduleThreadFile,
workerData:{'obj':obj}) //ERROR
会报告 Cannot set property code of which has only a getter.
- workerpool
pool=workerpool.pool()
pool.exec(obj,[])
实际上传入的 obj 在子线程中是 undefined
Nodejs 怎么实现并行化执行传输不可序列化对象?
维护一个 map ,用 id 来区分哪个任务完成了,触发相应回调。
#1 能简单举例一些伪代码来做示例吗? node 用得少。
没用过 workerpool ,但是,以我的理解
pool.exec(obj,[])
exec 第一个参数应该是一个 function ?,第二个参数你写了空数组的[],应该是传给这个 function 的参数列表?
传不了,看文档,写的清楚的不能更清楚了,用什么工具之前多看文档,多看文档
看文档,worker 能传递的数据格式有要求的
#3 function 也试过,没区别
#4 我想我需要的什么工具能支持传输完整原始对象的方法。
bluebird.map()比较方便实现并行,还能设置并发量 concurrency
http://bluebirdjs.com/docs/api/promise.map.html
#5 这说明了 workerData 不能传输这种对象。那么有其他方法可以实现吗?
workerData <any> Any JavaScript value that is cloned and made available as require(‘node:worker_threads’).workerData. The cloning occurs as described in the HTML structured clone algorithm, and an error is thrown if the object cannot be cloned (e.g. because it contains functions).
—
一定要通过子线程的方案来实现吗?把需要执行的方法改造成异步的形式是不是也可以。
#11 这个是计算密集任务
#10
你如果一定要传递这个对象参数,最直接的就是把这个对象,转成符合要求的数据类型。
比如把对象拍平,把值弄成数组传递
#9 看起来是个异步模型。我这个可能需要的是计算密集型加速方案。
#13 无法实现,对象非常复杂。
#15 对象可以序列化存储吗?可以的话直接写文件或者写数据库,worker 自己去读
确实,bluebird.map()适合 io 密集型任务的并行执行,计算密集型不合适。
计算密集型需要调用 cpu 多线程,可以通过 worker_threads 实现
主线程分配任务给 worker 线程,最简单就是传递字符串
const tasks = [<br>function taskA() { <br> // do something<br>}<br>
,<br>function taskB() { <br> // do something<br>}<br>
]
worker 线程接收到用 eval 语法执行 taskA 、taskB ,把执行结果返回给主线程。
题主的需求大概率实现不了,Node 的 Worker 是并行用多进程实现的,那哪些对象是没法序列化的?文件句柄,线程句柄,tcp/udp 连接句柄,而这些资源在进程间都是隔离的,即便是强行序列化传过去也用不了,当然 Linux 似乎有方式实现进程间资源共享,最好的方式还是支持基于内存通信的多线程的语言 go java c++等
我的天,有这么难吗,为什么一定要让线程去执行回调,线程把任务完成后通知主不可以吗?
没实践过,不知道有没有别的坑,思路就是把函数转为字符串,worker 中再把字符串转回来js<br>const { Worker, isMainThread, parentPort, workerData } = require('node:worker_threads');<br><br>if (isMainThread) {<br> const obj = {<br> name: 'Foo',<br> <br> greet(other) {<br> return `Hello ${<a target="_blank" href="http://this.name" rel="nofollow noopener">this.name</a>} and ${other}`;<br> }<br> }<br> <br> const objStr = JSON.stringify(obj, (key, value) => {<br> if (typeof value === 'function') {<br> return value.toString();<br> }<br> return value;<br> });<br><br> const worker = new Worker(__filename, {<br> workerData: objStr,<br> });<br> worker.on('message', (value) => {<br> console.log('Receive data from worker =>', value);<br> });<br> worker.on('error', console.error);<br> worker.on('exit', (code) => {<br> if (code !== 0)<br> console.error(new Error(`Worker stopped with exit code ${code}`));<br> });<br>} else {<br> const objStr = workerData;<br> const objParsed = JSON.parse(objStr);<br><br> const run = (obj, funcName, ...args) => {<br> if (obj.hasOwnProperty(funcName)) {<br> const funcStr = obj[funcName];<br> // 提取函数体,忽略函数参数定义<br> const funcBody = funcStr.substring(obj.greet.indexOf('{') + 1, obj.greet.lastIndexOf('}'));<br> // 使用剩余参数语法来定义一个新的函数,允许接收任意数量的参数<br> const funcArgs = funcStr.substring(funcStr.indexOf('(') + 1, funcStr.indexOf(')')).split(',').map(arg => arg.trim()).filter(arg => arg);<br> const func = new Function(...funcArgs, funcBody);<br> <br> return func.call(obj, ...args);<br> }<br> }<br><br> const result = run(objParsed, 'greet', 'Bar');<br><br> parentPort.postMessage(result);<br>} <br>
#16 JSON.stringfy 试过,不行,函数丢失了。这个对象有很多静态成员和方法。
#20 首先这个工程的主要功能需要编译成一个单文件,由于某些原因可能不会轻易改变这种行为。
此外由于 node 的多线程需要一个单独的 js 文件作为入口,所以我创建了一个新工程,因此这个工程无法直接导入原工程的内容。所以我尝试进行回调。
可以用 module.exports 导出方法,在 worker_threads 里调用吧
#24
> 首先这个工程的主要功能需要编译成一个单文件,由于某些原因可能不会轻易改变这种行为。
此外由于 node 的多线程需要一个单独的 js 文件作为入口,所以我创建了一个新工程,来创建新的 js 文件入口,因此这个工程无法直接导入原工程的内容。所以我尝试进行回调。
worker 不能传函数,你可以一个任务给一个 id ,任务处理完传 id 给主线程,根据 id 执行对应回调。超级简单。
重写一份不绑定对象的,参数可以序列化的 run
别想了,自己实现一个序列化/反序列化方法吧。
你就别当 node 有线程,node 相当于没有线程,先序列化再反序列化,多进程模型
而且 node 里面的 fork, 也不是你认为的 fork, 纯就是重新再启动一个新的
函数和对象引用都是不共享的,也不能传递,只能通信
在 worker 内加载/初始化这个对象不行么?
你的 worker 里面本身要有 [处理计算] 的代码,主线程只是负责把 [要处理的数据] [id] 传给 worker 线程,worker 计算完成之后只负责把计算好的数据 [字符串/基本类型/字节] [id] 发给主线程,主线程收到处理好的数据根据 [id] 再进行下一步处理 [合并数据] 。
#32 > 首先这个工程的主要功能需要编译成一个单文件,由于某些原因可能不会轻易改变这种行为。
此外由于 node 的多线程需要一个单独的 js 文件作为入口,所以我创建了一个新工程,来创建新的 js 文件入口,因此这个工程无法直接导入原工程的内容。
基于以上原因,实际上 worker 里基本没办法初始化这个对象。或者能提供某种方法可以避免上述的限制吗?
我看到 workerData 支持 Blob, dataview,arraybuffer 类型(The structured clone algorithm
)[https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#webapi_types]
能不能就这一个 js 文件既是主主入口文件也是 worker 入口文件,然后内部根据 mian/worker 分支判断呢?
#36 这个 js 文件有 10+M ,worker 子线程启动的时候会不会有性能问题。
你编不编译成单文件不重要啊,你都新建一个工程了,那么你的新工程存在两个文件就不可以?文件 A 作为和主交互的入口,并在里面 require 你的单文件 B 执行,B 执行完毕通过 A 把消息传递给主,不就完了吗,下面 26 楼说的也是这个意思
nodejs 中的 worker 是多进程,不是多线程,不能内存共享,正常情况没法传递引用。
但是 worker 支持 transferable 对象,也就是可以直接传递引用避免进程间的数据拷贝,其实还是要自己实现序列/反序列化。
如果你的 obj 不可序列化,那就把 obj 的实例化函数放到 worker 中,通过传递 obj 实例化的相关参数来实现。
源码级传递 —— 你在 worker 侧实现一个一摸一样的类。这样把数据丢过去就能计算了。
有没可能方向就错了, node 适合计算密集型任务?
不懂的能不能不要乱说? worker 什么时候变成进程了?你去 ps 看看进程号不难吧?天天在那误人子弟
进程间还能避免数据拷贝,说话都有逻辑吗?没事看看书吧
想办法序列化对象吧~,或者在通过一些参数,在 worker 里实例化对象
切片上传?
理解错了( ̄▽ ̄)"
官方都说了不允许传递 function ,还在那序列化,序列化,现在程序员的平均水平是真的低
请求删除屏蔽 用户引战,群嘲不友善行为
菜就多练
看起来是在说我,虽然但是,你说的对,是进程不是线程,因为是线程隔离,无法直接传递引用,我的确误人子弟了。
对不起,还是写反了,能理解意思就好😁
在Node.js中,由于JavaScript的单线程特性,实现真正的并行化通常需要借助子进程或工作线程。由于Node.js的主线程(事件循环)不能直接传输不可序列化对象(如函数、某些自定义类实例等),我们可以使用 child_process
模块来创建子进程,并通过消息传递机制间接实现并行化。
以下是一个使用 child_process
模块和 fork
方法来并行化执行任务的示例,其中任务处理不可序列化对象的需求通过子进程隔离环境来实现:
// 主进程 (main.js)
const { fork } = require('child_process');
const child = fork('child.js');
// 发送一个不可序列化的对象(通过消息传递的间接方式处理)
// 实际上,直接发送不可序列化对象会失败,这里仅示意如何设置通信
child.on('message', (msg) => {
console.log('Message from child', msg);
});
child.send({ cmd: 'start', data: 'some serializable data' });
// 子进程 (child.js)
process.on('message', (msg) => {
if (msg.cmd === 'start') {
// 在子进程中处理不可序列化对象
const nonSerializableObj = {
someFunction: function() { console.log('Function in child process'); }
};
// 执行某些操作
nonSerializableObj.someFunction();
// 向主进程发送消息
process.send('Task completed in child');
}
});
在这个例子中,我们利用Node.js的子进程机制来隔离执行环境,从而在子进程中处理不可序列化的对象。注意,实际不可序列化对象不能通过 send
方法直接传递,这里的 data
字段仅为可序列化数据示例。