Python中如何使用parallel Python进行并行计算?
最近在学习使用 parallel python 模块进行分布式机器学习,遇到了一个问题:写了一个创建学习模型的函数,是分发到各计算节点上的,如下面的代码所示。Param 是一个字典,用于传送要构建模型的宏参数,当我不使用它使用默认的参数时,能够正常工作;当把两个#号去掉,使用传过来的参数时,就不能正常工作了。问一下我的代码错在了哪里?
ModelName = "LGBMRegressor"
params = { “max_depth”: 1, “num_leaves”: 4}
def CreateModel( ModelName, Params ):
max_depth, num_leaves = 10, 20
#if “max_depth” in Params: max_depth = Params[ “max_depth” ]
#if “num_leaves” in Params: num_leaves = Params[ “num_leaves” ]
Model, IsClassifier = None, True
ModelName = ModelName.lower()
if ModelName == “lgbmregressor”:
IsClassifier = False
Model = lightgbm.LGBMRegressor( max_depth=max_depth, num_leaves=num_leaves )
return Model, IsClassifier
另外我若在本地创建好模型,把模型分发给各计算节点,程序就不能正常工作,如果在节点上创建模型,如上所示,就出现了上面的情况,该怎么解决?
Python中如何使用parallel Python进行并行计算?
各位请注意,代码传上去之后没有缩进了,实际上是有的
# 使用pp模块进行并行计算的基本示例
import pp
# 定义要并行执行的计算函数
def calculate_square(n):
return n * n
# 创建任务服务器,自动检测CPU核心数
job_server = pp.Server()
# 提交多个任务到并行队列
jobs = []
for i in range(10):
# 异步提交任务,指定函数、参数和依赖模块
job = job_server.submit(calculate_square, (i,), (), ("math",))
jobs.append(job)
# 收集结果
results = [job() for job in jobs]
print(f"计算结果: {results}")
# 获取执行统计信息
print(f"使用核心数: {job_server.get_ncpus()}")
print(f"任务统计: {job_server.print_stats()}")
要使用Parallel Python(pp),首先安装:pip install pp。核心步骤:
- 定义独立计算函数
- 创建
pp.Server()实例 - 用
submit()提交任务,参数格式:(函数, 参数元组, 依赖函数元组, 依赖模块元组) - 通过
job()获取异步结果
pp支持跨机器集群,只需在Server()指定IP列表。注意被并行函数不能有副作用,且所有依赖需显式声明。
简单说就是定义函数、提交任务、收集结果三步。
大小写
希望 v2 能改善一下对代码显示的支持吧,毕竟好歹也是程序员技术社区啊
发帖能用 Markdown,评论能用 gist ?还不完善??
#3 markdown 不行么。。。都说了是‘程序员技术社区’,那么 markdown 应该都会一点吧。。。就算不会也可以编辑器写完复制进来吧
#6 评论涉及到代码就‘ gist ’了吧。发帖 markdown 可用可不用,没有人强制要用 md,但是为了看的更清晰最好还是用 md 吧
是我孤陋寡闻了,主要是看很多帖子都是直接复制代码,使得排版全无,以为没这些东西的…
if 语句格式不是要换行?
if “max_depth” in Params:
max_depth = Params[ “max_depth” ]
不能正常工作是 python 报错还是结果不符合预期?
他前面定义的是小写,后面判断用的是大写,是两个变量
回 ant2017:换行也不行,试了。
回 wwg1994: 不是 python 报错,是不出结果。并且 CPU 占用率是 0
各位有点新进展,我把函数添加了一项,然后使用 xgboost 就没有问题。是不是 lightGBM 要依赖一些别的模块? parallel Python 要求把必要的引用显式地写在代码中。添加后的相关代码如下:
def CreateModel( ModelName, Params ):
max_depth, num_leaves, min_child_weight = 10, 20, 1
if “max_depth” in Params: max_depth = Params[ “max_depth” ]
if “num_leaves” in Params: num_leaves = Params[ “num_leaves” ]
if “min_child_weight” in Params: min_child_weight = Params[ “min_child_weight” ]
Model, IsClassifier = None, True
ModelName = ModelName.lower()
if ModelName == “lgbmregressor”:
IsClassifier = False
Model = lightgbm.LGBMRegressor( max_depth=max_depth, num_leaves=num_leaves )
elif ModelName == “xgbregressor”:
IsClassifier = False;
Model = xgboost.XGBRegressor( n_jobs=16, max_depth=max_depth, min_child_weight=min_child_weight )
return Model, IsClassifier

