济南php网站开发使用element做的网站
news/
2025/10/4 10:10:23/
文章来源:
济南php网站开发,使用element做的网站,安徽茶叶学会 网站建设,推广 电子商务网站建设Mars 是一个并行和分布式 Python 框架#xff0c;能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库#xff0c;以及 Python 函数利用多核或者多机加速。这其中#xff0c;并行和分布式 Python 函数主要利用 Mars Remote API。
启动 Mars 分布式环境可以参考…Mars 是一个并行和分布式 Python 框架能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库以及 Python 函数利用多核或者多机加速。这其中并行和分布式 Python 函数主要利用 Mars Remote API。
启动 Mars 分布式环境可以参考
命令行方式在集群中部署。Kubernetes 中部署。MaxCompute 开箱即用的环境购买了 MaxCompute 服务的可以直接使用。
如何使用 Mars Remote API
使用 Mars Remote API 非常简单只需要对原有的代码做少许改动就可以分布式执行。
拿用蒙特卡洛方法计算 π 为例。代码如下我们编写了两个函数calc_chunk 用来计算每个分片内落在圆内的点的个数calc_pi 用来把多个分片 calc_chunk 计算的结果汇总最后得出 π 值。
from typing import List
import numpy as npdef calc_chunk(n: int, i: int):# 计算n个随机点x和y轴落在-1到1之间到原点距离小于1的点的个数rs np.random.RandomState(i)a rs.uniform(-1, 1, size(n, 2))d np.linalg.norm(a, axis1)return (d 1).sum()def calc_pi(fs: List[int], N: int):# 将若干次 calc_chunk 计算的结果汇总计算 pi 的值return sum(fs) * 4 / NN 200_000_000
n 10_000_000fs [calc_chunk(n, i)for i in range(N // n)]
pi calc_pi(fs, N)
print(pi)
%%time 下可以看到结果
3.1416312
CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s
Wall time: 12.3 s
在单机需要 12.3 s。
要让这个计算使用 Mars Remote API 并行起来我们不需要对函数做任何改动需要变动的仅仅是最后部分。
import mars.remote as mr# 函数调用改成 mars.remote.spawn
fs [mr.spawn(calc_chunk, args(n, i))for i in range(N // n)]
# 把 spawn 的列表传入作为参数再 spawn 新的函数
pi mr.spawn(calc_pi, args(fs, N))
# 通过 execute() 触发执行fetch() 获取结果
print(pi.execute().fetch())
%%time 下看到结果
3.1416312
CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms
Wall time: 2.85 s
结果一模一样但是却有数倍的性能提升。
可以看到对已有的 Python 代码Mars remote API 几乎不需要做多少改动就能有效并行和分布式来加速执行过程。
一个例子
为了让读者理解 Mars Remote API 的作用我们从另一个例子开始。现在我们有一个数据集我们希望对它们做一个分类任务。要做分类我们有很多算法和库可以选择这里我们用 RandomForest、LogisticRegression以及 XGBoost。
困难的地方是除了有多个模型选择这些模型也会包含多个超参那哪个超参效果最好呢对于调参不那么有经验的同学跑过了才知道。所以我们希望能生成一堆可选的超参然后把他们都跑一遍看看效果。
准备数据
这个例子里我们使用 otto 数据集。
首先我们准备数据。读取数据后我们按 21 的比例把数据分成训练集和测试集。
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_splitdef gen_data():df pd.read_csv(otto/train.csv)X df.drop([target, id], axis1)y df[target]label_encoder LabelEncoder()label_encoder.fit(y)y label_encoder.transform(y)return train_test_split(X, y, test_size0.33, random_state123)X_train, X_test, y_train, y_test gen_data()
模型
接着我们使用 scikit-learn 的 RandomForest 和 LogisticRegression 来处理分类。
RandomForest
from sklearn.ensemble import RandomForestClassifierdef random_forest(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool False,**kw):model RandomForestClassifier(verboseverbose, **kw)model.fit(X_train, y_train)return model
接着我们生成供 RandomForest 使用的超参我们用 yield 的方式来迭代返回。
def gen_random_forest_parameters():for n_estimators in [50, 100, 600]:for max_depth in [None, 3, 15]:for criterion in [gini, entropy]:yield {n_estimators: n_estimators,max_depth: max_depth,criterion: criterion}
LogisticRegression 也是这个过程。我们先定义模型。
from sklearn.linear_model import LogisticRegressiondef logistic_regression(X_train: pd.DataFrame,y_train: pd.Series,verbose: bool False,**kw):model LogisticRegression(verboseverbose, **kw)model.fit(X_train, y_train)return model
接着生成供 LogisticRegression 使用的超参。
def gen_lr_parameters():for penalty in [l2, none]:for tol in [0.1, 0.01, 1e-4]:yield {penalty: penalty,tol: tol}
XGBoost 也是一样我们用 XGBClassifier 来执行分类任务。
from xgboost import XGBClassifierdef xgb(X_train: pd.DataFrame,y_train: pd.Series,verbose: bool False,**kw):model XGBClassifier(verbosityint(verbose), **kw)model.fit(X_train, y_train)return model
生成一系列超参。
def gen_xgb_parameters():for n_estimators in [100, 600]:for criterion in [gini, entropy]:for learning_rate in [0.001, 0.1, 0.5]:yield {n_estimators: n_estimators,criterion: criterion,learning_rate: learning_rate}
验证
接着我们编写验证逻辑这里我们使用 log_loss 来作为评价函数。
from sklearn.metrics import log_lossdef metric_model(model, X_test: pd.DataFrame,y_test: pd.Series) - float:if isinstance(model, bytes):model pickle.loads(model)y_pred model.predict_proba(X_test)return log_loss(y_test, y_pred)def train_and_metric(train_func,train_params: dict,X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series,verbose: bool False):# 把训练和验证封装到一起model train_func(X_train, y_train, verboseverbose, **train_params)metric metric_model(model, X_test, y_test)return model, metric
找出最好的模型
做好准备工作后我们就开始来跑模型了。针对每个模型我们把每次生成的超参们送进去训练除了这些超参我们还把 n_jobs 设成 -1这样能更好利用单机的多核。
results []# -------------
# Random Forest
# -------------for params in gen_random_forest_parameters():print(fcalculating on {params})# fixed random_stateparams[random_state] 123# use all CPU coresparams[n_jobs] -1model, metric train_and_metric(random_forest, params,X_train, y_train,X_test, y_test)print(fmetric: {metric})results.append({model: model, metric: metric})# -------------------
# Logistic Regression
# -------------------for params in gen_lr_parameters():print(fcalculating on {params})# fixed random_stateparams[random_state] 123# use all CPU coresparams[n_jobs] -1model, metric train_and_metric(logistic_regression, params,X_train, y_train,X_test, y_test)print(fmetric: {metric})results.append({model: model, metric: metric})# -------
# XGBoost
# -------for params in gen_xgb_parameters():print(fcalculating on {params})# fixed random_stateparams[random_state] 123# use all CPU coresparams[n_jobs] -1model, metric train_and_metric(xgb, params,X_train, y_train,X_test, y_test)print(fmetric: {metric})results.append({model: model, metric: metric})
运行一下需要相当长时间我们省略掉一部分输出内容。
calculating on {n_estimators: 50, max_depth: None, criterion: gini}
metric: 0.6964123781828575
calculating on {n_estimators: 50, max_depth: None, criterion: entropy}
metric: 0.6912312790832288
# 省略其他模型的输出结果
CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s
Wall time: 31min 44s
从 CPU 时间和 Wall 时间能看出来这些训练还是充分利用了多核的性能。但整个过程还是花费了 31 分钟。
使用 Remote API 分布式加速
现在我们尝试使用 Remote API 通过分布式方式加速整个过程。
集群方面我们使用最开始说的第三种方式直接在 MaxCompute 上拉起一个集群。大家可以选择其他方式效果是一样的。
n_cores 8
mem 2 * n_cores # 16G
# o 是 MaxCompute 入口这里创建 10 个 worker 的集群每个 worker 8核16G
cluster o.create_mars_cluster(10, n_cores, mem, imageextended)
为了方便在分布式读取数据我们对数据处理稍作改动把数据上传到 MaxCompute 资源。对于其他环境用户可以考虑 HDFS、Aliyun OSS 或者 Amazon S3 等存储。
if not o.exist_resource(otto_train.csv):with open(otto/train.csv) as f:# 上传资源o.create_resource(otto_train.csv, file, fileobjf)def gen_data():# 改成从资源读取df pd.read_csv(o.open_resource(otto_train.csv))X df.drop([target, id], axis1)y df[target]label_encoder LabelEncoder()label_encoder.fit(y)y label_encoder.transform(y)return train_test_split(X, y, test_size0.33, random_state123)
稍作改动之后我们使用 mars.remote.spawn 方法来让 gen_data 调度到集群上运行。
import mars.remote as mr# n_output 说明是 4 输出
# execute() 执行后数据会读取到 Mars 集群内部
data mr.ExecutableTuple(mr.spawn(gen_data, n_output4)).execute()
# remote_ 开头的都是 Mars 对象这时候数据在集群内这些对象只是引用
remote_X_train, remote_X_test, remote_y_train, remote_y_test data
目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等还不能序列化模型所以我们要对 train_and_metric 稍作改动把模型 pickle 了之后再返回。
def distributed_train_and_metric(train_func,train_params: dict,X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series,verbose: bool False):model, metric train_and_metric(train_func, train_params,X_train, y_train, X_test, y_test, verboseverbose)return pickle.dumps(model), metric
后续 Mars 支持了序列化模型后可以直接 spawn 原本的函数。
接着我们就对前面的执行过程稍作改动把函数调用全部都用 mars.remote.spawn 来改写。
import numpy as nptasks []
models []
metrics []# -------------
# Random Forest
# -------------for params in gen_random_forest_parameters():# fixed random_stateparams[random_state] 123task mr.spawn(distributed_train_and_metric,args(random_forest, params,remote_X_train, remote_y_train,remote_X_test, remote_y_test), kwargs{verbose: 2},n_output2)tasks.extend(task)# 把模型和评价分别存储models.append(task[0])metrics.append(task[1])# -------------------
# Logistic Regression
# -------------------for params in gen_lr_parameters():# fixed random_stateparams[random_state] 123task mr.spawn(distributed_train_and_metric,args(logistic_regression, params,remote_X_train, remote_y_train,remote_X_test, remote_y_test), kwargs{verbose: 2},n_output2)tasks.extend(task)# 把模型和评价分别存储models.append(task[0])metrics.append(task[1])# -------
# XGBoost
# -------for params in gen_xgb_parameters():# fixed random_stateparams[random_state] 123# 再指定并发为核的个数params[n_jobs] n_corestask mr.spawn(distributed_train_and_metric,args(xgb, params,remote_X_train, remote_y_train,remote_X_test, remote_y_test), kwargs{verbose: 2},n_output2)tasks.extend(task)# 把模型和评价分别存储models.append(task[0])metrics.append(task[1])# 把顺序打乱目的是能分散到 worker 上平均一点
shuffled_tasks np.random.permutation(tasks)
_ mr.ExecutableTuple(shuffled_tasks).execute()
可以看到代码几乎一致。
运行查看结果
CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms
Wall time: 1min 59s
时间一下子从 31 分钟多来到了 2 分钟提升 15x。但代码修改的代价可以忽略不计。
细心的读者可能注意到了分布式运行的代码中我们把模型的 verbose 给打开了在分布式环境下因为这些函数远程执行打印的内容只会输出到 worker 的标准输出流我们在客户端不会看到打印的结果但 Mars 提供了一个非常有用的接口来让我们查看每个模型运行时的输出。
以第0个模型为例我们可以在 Mars 对象上直接调用 fetch_log 方法。
print(models[0].fetch_log())
输出我们简略一部分。
building tree 1 of 50
building tree 2 of 50
building tree 3 of 50
building tree 4 of 50
building tree 5 of 50
building tree 6 of 50
# 中间省略
building tree 49 of 50
building tree 50 of 50
要看哪个模型都可以通过这种方式。试想下如果没有 fetch_log API你确想看中间过程的输出有多麻烦。首先这个函数在哪个 worker 上执行不得而知然后即便知道是哪个 worker因为每个 worker 上可能有多个函数执行这些输出就可能混杂在一起甚至被庞大日志淹没了。fetch_log 接口让用户不需要关心在哪个 worker 上执行也不用担心日志混合在一起。
想要了解 fetch_log 接口可以查看 文档。
还有更多
Mars Remote API 的能力其实不止这些举个例子在 remote 内部可以 spawn 新的函数也可以调用 Mars tensor、DataFrame 或者 learn 的算法。这些内容读者们可以先行探索后续我们再写别的文章介绍。
总结
Mars Remote API 通过并行和分布式 Python 函数用很小的修改代价极大提升了执行效率。
原文链接 本文为阿里云原创内容未经允许不得转载。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/927003.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!