在BML平台使用并行文件系统PFS和对象存储BOS |
产品推荐: 1、安全稳定的云服务器租用,2核/2G/5M仅37元,点击抢购>>>; 2、高防物理服务器20核/16G/50M/500G防御仅350元,点击抢购>>> 3、百度智能建站(五合一网站)仅880元/年,点击抢购>>> 模板建站(PC+手机站)仅480元/年,点击抢购>>> 4、阿里云服务器2核2G3M仅99元/年、2核4G5M仅199元/年,新老同享,点击抢购>>> 5、腾讯云服务器2核2G4M仅99元/年、新老同享,点击抢购>>> 在BML平台使用并行文件系统PFS和对象存储BOS 在BML平台使用并行文件系统PFS平台支持用户在用户资源池上关联并行文件存储PFS作为建模任务时的数据存储,当前支持使用并行文件系统PFS提交的任务:
前提条件
在用户资源池中挂载PFS实例
使用PFS作为数据来源提交作业建模任务在提交作业建模任务时,选择用户资源池后,支持选择该资源池中引入的BOS存储或者PFS存储作为数据来源。 挂载数据和编辑代码时需要注意:平台仅能使用挂载路径下的文件,请确保您的相关文件在此挂载路径之下。 操作及代码示例前提条件平台上的用户资源池已经成功挂载PFS。 操作依次点击 自定义作业 -> 训练作业 -> 新建 选择了用户资源池之后,算法配置信息填写如下: 以代码文件为例: 找到机器中pfs的挂载点,将代码及数据集上传到pfs挂载中,代码文件中只需要填写基于挂载路径的相对路径即可。 demo2.py内容附后, 为官网提供的多节点paddle2.1.1的demo案例,我们只需要修改输出路径为pfs中基于挂载路径的相对路径即可。 代码示例#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import os
import gzip
import struct
import numpy as np
from PIL import Image
import time
import paddle
import paddle.distributed.fleet as fleet
import paddle.static.nn as nn
import paddle.fluid as fluid
from paddle.io import Dataset
TEST_IMAGE = 't10k-images-idx3-ubyte.gz'
TEST_LABEL = 't10k-labels-idx1-ubyte.gz'
TRAIN_IMAGE = 'train-images-idx3-ubyte.gz'
TRAIN_LABEL = 'train-labels-idx1-ubyte.gz'
class MNIST(Dataset):
"""
MNIST
"""
def __init__(self,
data_dir=None,
mode='train',
transform=None,
backend=None):
assert mode.lower() in ['train', 'test'], "mode should be 'train' or 'test', but got {}".format(mode)
if backend is None:
backend = paddle.vision.get_image_backend()
if backend not in ['pil', 'cv2']:
raise ValueError(
"Expected backend are one of ['pil', 'cv2'], but got {}"
.format(backend))
self.backend = backend
self.mode = mode.lower()
if self.mode == 'train':
self.image_path = os.path.join(data_dir, TRAIN_IMAGE)
self.label_path = os.path.join(data_dir, TRAIN_LABEL)
else:
self.image_path = os.path.join(data_dir, TEST_IMAGE)
self.label_path = os.path.join(data_dir, TEST_LABEL)
self.transform = transform
# read dataset into memory
self._parse_dataset()
self.dtype = paddle.get_default_dtype()
def _parse_dataset(self, buffer_size=100):
self.images = []
self.labels = []
with gzip.GzipFile(self.image_path, 'rb') as image_file:
img_buf = image_file.read()
with gzip.GzipFile(self.label_path, 'rb') as label_file:
lab_buf = label_file.read()
step_label = 0
offset_img = 0
# read from Big-endian
# get file info from magic byte
# image file : 16B
magic_byte_img = '>IIII'
magic_img, image_num, rows, cols = struct.unpack_from(
magic_byte_img, img_buf, offset_img)
offset_img += struct.calcsize(magic_byte_img)
offset_lab = 0
# label file : 8B
magic_byte_lab = '>II'
magic_lab, label_num = struct.unpack_from(magic_byte_lab,
lab_buf, offset_lab)
offset_lab += struct.calcsize(magic_byte_lab)
while True:
if step_label >= label_num:
break
fmt_label = '>' + str(buffer_size) + 'B'
labels = struct.unpack_from(fmt_label, lab_buf, offset_lab)
offset_lab += struct.calcsize(fmt_label)
step_label += buffer_size
fmt_images = '>' + str(buffer_size * rows * cols) + 'B'
images_temp = struct.unpack_from(fmt_images, img_buf,
offset_img)
images = np.reshape(images_temp, (buffer_size, rows *
cols)).astype('float32')
offset_img += struct.calcsize(fmt_images)
for i in range(buffer_size):
self.images.append(images[i, :])
self.labels.append(
np.array([labels[i]]).astype('int64'))
def __getitem__(self, idx):
image, label = self.images[idx], self.labels[idx]
image = np.reshape(image, [28, 28])
if self.backend == 'pil':
image = Image.fromarray(image.astype('uint8'), mode='L')
if self.transform is not None:
image = self.transform(image)
if self.backend == 'pil':
return image, label.astype('int64')
return image.astype(self.dtype), label.astype('int64')
def __len__(self):
return len(self.labels)
def mlp_model():
"""
mlp_model
"""
x = paddle.static.data(name="x", shape=[64, 28, 28], dtype='float32')
y = paddle.static.data(name="y", shape=[64, 1], dtype='int64')
x_flatten = paddle.reshape(x, [64, 784])
fc_1 = nn.fc(x=x_flatten, size=128, activation='tanh')
fc_2 = nn.fc(x=fc_1, size=128, activation='tanh')
prediction = nn.fc(x=[fc_2], size=10, activation='softmax')
cost = paddle.fluid.layers.cross_entropy(input=prediction, label=y)
acc_top1 = paddle.metric.accuracy(input=prediction, label=y, k=1)
avg_cost = paddle.mean(x=cost)
res = [x, y, prediction, avg_cost, acc_top1]
return res
def train(epoch, exe, train_dataloader, cost, acc):
"""
train
"""
total_time = 0
step = 0
for data in train_dataloader():
step += 1
start_time = time.time()
loss_val, acc_val = exe.run(
paddle.static.default_main_program(),
feed=data, fetch_list=[cost.name, acc.name])
if step % 100 == 0:
end_time = time.time()
total_time += (end_time - start_time)
print(
"epoch: %d, step:%d, train_loss: %f, train_acc: %f, total time cost = %f, speed: %f"
% (epoch, step, loss_val[0], acc_val[0], total_time,
1 / (end_time - start_time) ))
def test(exe, test_dataloader, cost, acc):
"""
test
"""
total_time = 0
step = 0
for data in test_dataloader():
step += 1
start_time = time.time()
loss_val, acc_val = exe.run(
paddle.static.default_main_program(),
feed=data, fetch_list=[cost.name, acc.name])
if step % 100 == 0:
end_time = time.time()
total_time += (end_time - start_time)
print(
"step:%d, test_loss: %f, test_acc: %f, total time cost = %f, speed: %f"
% (step, loss_val[0], acc_val[0], total_time,
1 / (end_time - start_time) ))
def save(save_dir, feed_vars, fetch_vars, exe):
"""
save
"""
path_prefix = os.path.join(save_dir, 'model')
if fleet.is_first_worker():
paddle.static.save_inference_model(path_prefix, feed_vars, fetch_vars, exe)
if __name__ == '__main__':
# 设置训练集路径
train_data = './job_model/paddle2.1.1/train_data'
# 设置验证集路径
test_data = './job_model/paddle2.1.1/train_data'
# 设置输出路径
save_dir = './job_model/paddle2.1.1/output'
# 设置迭代轮数
epochs = 10
# 设置验证间隔轮数
test_interval = 2
# 设置模型保存间隔轮数
save_interval = 2
paddle.enable_static()
paddle.vision.set_image_backend('cv2')
# 训练数据集
train_dataset = MNIST(data_dir=train_data, mode='train')
# 验证数据集
test_dataset = MNIST(data_dir=test_data, mode='test')
# 设置模型
[x, y, pred, cost, acc] = mlp_model()
place = paddle.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
# 数据加载
train_dataloader = paddle.io.DataLoader(
train_dataset, feed_list=[x, y], drop_last=True,
places=place, batch_size=64, shuffle=True, return_list=False)
test_dataloader = paddle.io.DataLoader(
test_dataset, feed_list=[x, y], drop_last=True,
places=place, batch_size=64, return_list=False)
# fleet初始化
strategy = fleet.DistributedStrategy()
fleet.init(is_collective=True, strategy=strategy)
# 设置优化器
optimizer = paddle.optimizer.Adam()
optimizer = fleet.distributed_optimizer(optimizer)
optimizer.minimize(cost)
exe = paddle.static.Executor(place)
exe.run(paddle.static.default_startup_program())
prog = paddle.static.default_main_program()
for epoch in range(epochs):
train(epoch, exe, train_dataloader, cost, acc)
if epoch % test_interval == 0:
test(exe, test_dataloader, cost, acc)
# save model
if epoch % save_interval == 0:
save(save_dir, [x], [pred], exe) 资源配置 (依据代码情况填写即可,上文中的案例用到了多节点和GPU资源) |