233
云计算
负载均衡
产品简介
产品价格
快速入门
用户指南
实践操作
常见问题
购买指南
云服务器
产品简介
计费规则
购买指导
控制台使用指南
常见问题
云数据库
产品简介
计费规则
购买指导
快速入门
连接实例
控制台使用指南
对象存储
产品简介
控制台使用指南
程序员百科
Python
在BML平台使用并行文件系统PFS和对象存储BOS

产品推荐:

1、安全稳定的云服务器租用,2核/2G/5M仅37元,点击抢购>>>

2、高防物理服务器20核/16G/50M/200G防御仅350元,点击抢购>>>

3、百度智能建站(五合一网站)仅880元/年,点击抢购>>> 模板建站(PC+手机站)仅480元/年,点击抢购>>>


点击这里点击这里申请百度智能云VIP帐号,立即体验BML>>>

在BML平台使用并行文件系统PFS和对象存储BOS

在BML平台使用并行文件系统PFS

平台支持用户在用户资源池上关联并行文件存储PFS作为建模任务时的数据存储,当前支持使用并行文件系统PFS提交的任务:

  • 自定义作业-训练作业任务、自动搜索作业任务

前提条件

  1. 用户在平台上已经挂载了容器引擎CCE资源作为用户资源池,点击了解容器引擎CCE;

image.png

  1. 用户已经创建了并行文件系统PFS,点击了解并行文件系统PFS。
  2. 并行文件系统PFS能够被容器引擎CCE资源访问到,也即能被对应的VPC访问到。

在用户资源池中挂载PFS实例

  • Step1:进入平台管理-资源池管理,已挂载运行正常的用户资源池支持“挂载PFS”的操作项,点击即可选择挂载的PFS实例。

image.png

image.png

  • Step2:完成引入PFS后,支持点击“查看PFS”,并支持卸载操作。

image.png

image.png

使用PFS作为数据来源提交作业建模任务

在提交作业建模任务时,选择用户资源池后,支持选择该资源池中引入的BOS存储或者PFS存储作为数据来源。

挂载数据和编辑代码时需要注意:平台仅能使用挂载路径下的文件,请确保您的相关文件在此挂载路径之下。

image.png

操作及代码示例

前提条件

平台上的用户资源池已经成功挂载PFS。

操作

依次点击 自定义作业 -> 训练作业 -> 新建

选择了用户资源池之后,算法配置信息填写如下:

0ee28d968040f3261f115db1fe27b7c1.png

以代码文件为例: 找到机器中pfs的挂载点,将代码及数据集上传到pfs挂载中,代码文件中只需要填写基于挂载路径的相对路径即可。

9a677c8e011c1ef0acb9fe97362592fa.png

demo2.py内容附后, 为官网提供的多节点paddle2.1.1的demo案例,我们只需要修改输出路径为pfs中基于挂载路径的相对路径即可。

9acdee9dd9143be6cdfe111ce0340386.png

代码示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""
import os
import gzip
import struct
import numpy as np
from PIL import Image
import time
import paddle
import paddle.distributed.fleet as fleet
import paddle.static.nn as nn
import paddle.fluid as fluid
from paddle.io import Dataset
TEST_IMAGE = 't10k-images-idx3-ubyte.gz'
TEST_LABEL = 't10k-labels-idx1-ubyte.gz'
TRAIN_IMAGE = 'train-images-idx3-ubyte.gz'
TRAIN_LABEL = 'train-labels-idx1-ubyte.gz'
class MNIST(Dataset):
    """
    MNIST
    """
    def __init__(self,
                 data_dir=None,
                 mode='train',
                 transform=None,
                 backend=None):
        assert mode.lower() in ['train', 'test'],                 "mode should be 'train' or 'test', but got {}".format(mode)
        if backend is None:
            backend = paddle.vision.get_image_backend()
        if backend not in ['pil', 'cv2']:
            raise ValueError(
                "Expected backend are one of ['pil', 'cv2'], but got {}"
                .format(backend))
        self.backend = backend
        self.mode = mode.lower()
        if self.mode == 'train':
            self.image_path = os.path.join(data_dir, TRAIN_IMAGE)
            self.label_path = os.path.join(data_dir, TRAIN_LABEL)
        else:
            self.image_path = os.path.join(data_dir, TEST_IMAGE)
            self.label_path = os.path.join(data_dir, TEST_LABEL)
        self.transform = transform
        # read dataset into memory
        self._parse_dataset()
        self.dtype = paddle.get_default_dtype()
    def _parse_dataset(self, buffer_size=100):
        self.images = []
        self.labels = []
        with gzip.GzipFile(self.image_path, 'rb') as image_file:
            img_buf = image_file.read()
            with gzip.GzipFile(self.label_path, 'rb') as label_file:
                lab_buf = label_file.read()
                step_label = 0
                offset_img = 0
                # read from Big-endian
                # get file info from magic byte
                # image file : 16B
                magic_byte_img = '>IIII'
                magic_img, image_num, rows, cols = struct.unpack_from(
                    magic_byte_img, img_buf, offset_img)
                offset_img += struct.calcsize(magic_byte_img)
                offset_lab = 0
                # label file : 8B
                magic_byte_lab = '>II'
                magic_lab, label_num = struct.unpack_from(magic_byte_lab,
                                                          lab_buf, offset_lab)
                offset_lab += struct.calcsize(magic_byte_lab)
                while True:
                    if step_label >= label_num:
                        break
                    fmt_label = '>' + str(buffer_size) + 'B'
                    labels = struct.unpack_from(fmt_label, lab_buf, offset_lab)
                    offset_lab += struct.calcsize(fmt_label)
                    step_label += buffer_size
                    fmt_images = '>' + str(buffer_size * rows * cols) + 'B'
                    images_temp = struct.unpack_from(fmt_images, img_buf,
                                                     offset_img)
                    images = np.reshape(images_temp, (buffer_size, rows *
                                                      cols)).astype('float32')
                    offset_img += struct.calcsize(fmt_images)
                    for i in range(buffer_size):
                        self.images.append(images[i, :])
                        self.labels.append(
                            np.array([labels[i]]).astype('int64'))
    def __getitem__(self, idx):
        image, label = self.images[idx], self.labels[idx]
        image = np.reshape(image, [28, 28])
        if self.backend == 'pil':
            image = Image.fromarray(image.astype('uint8'), mode='L')
        if self.transform is not None:
            image = self.transform(image)
        if self.backend == 'pil':
            return image, label.astype('int64')
        return image.astype(self.dtype), label.astype('int64')
    def __len__(self):
        return len(self.labels)
def mlp_model():
    """
    mlp_model
    """
    x = paddle.static.data(name="x", shape=[64, 28, 28], dtype='float32')
    y = paddle.static.data(name="y", shape=[64, 1], dtype='int64')
    x_flatten = paddle.reshape(x, [64, 784])
    fc_1 = nn.fc(x=x_flatten, size=128, activation='tanh')
    fc_2 = nn.fc(x=fc_1, size=128, activation='tanh')
    prediction = nn.fc(x=[fc_2], size=10, activation='softmax')
    cost = paddle.fluid.layers.cross_entropy(input=prediction, label=y)
    acc_top1 = paddle.metric.accuracy(input=prediction, label=y, k=1)
    avg_cost = paddle.mean(x=cost)
    res = [x, y, prediction, avg_cost, acc_top1]
    return res
def train(epoch, exe, train_dataloader, cost, acc):
    """
    train
    """
    total_time = 0
    step = 0
    for data in train_dataloader():
        step += 1
        start_time = time.time()
        loss_val, acc_val = exe.run(
        paddle.static.default_main_program(),
        feed=data, fetch_list=[cost.name, acc.name])
        if step % 100 == 0:
            end_time = time.time()
            total_time += (end_time - start_time)
            print(
                    "epoch: %d, step:%d, train_loss: %f, train_acc: %f, total time cost = %f, speed: %f"
                % (epoch, step, loss_val[0], acc_val[0], total_time,
                1 / (end_time - start_time) ))
def test(exe, test_dataloader, cost, acc):
    """
    test
    """
    total_time = 0
    step = 0
    for data in test_dataloader():
        step += 1
        start_time = time.time()
        loss_val, acc_val = exe.run(
        paddle.static.default_main_program(),
        feed=data, fetch_list=[cost.name, acc.name])
        if step % 100 == 0:
            end_time = time.time()
            total_time += (end_time - start_time)
            print(
                    "step:%d, test_loss: %f, test_acc: %f, total time cost = %f, speed: %f"
                % (step, loss_val[0], acc_val[0], total_time,
                1 / (end_time - start_time) ))
def save(save_dir, feed_vars, fetch_vars, exe):
    """
    save
    """
    path_prefix = os.path.join(save_dir, 'model')
    if fleet.is_first_worker():
        paddle.static.save_inference_model(path_prefix, feed_vars, fetch_vars, exe)
if __name__ == '__main__':
    # 设置训练集路径
    train_data = './job_model/paddle2.1.1/train_data'
    # 设置验证集路径
    test_data = './job_model/paddle2.1.1/train_data'
    # 设置输出路径
    save_dir = './job_model/paddle2.1.1/output'
    # 设置迭代轮数
    epochs = 10
    # 设置验证间隔轮数
    test_interval = 2
    # 设置模型保存间隔轮数
    save_interval = 2
    paddle.enable_static()
    paddle.vision.set_image_backend('cv2')
    # 训练数据集
    train_dataset = MNIST(data_dir=train_data, mode='train')
    # 验证数据集
    test_dataset = MNIST(data_dir=test_data, mode='test')
    # 设置模型
    [x, y, pred, cost, acc] = mlp_model()
    place = paddle.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
    # 数据加载
    train_dataloader = paddle.io.DataLoader(
        train_dataset, feed_list=[x, y], drop_last=True,
        places=place, batch_size=64, shuffle=True, return_list=False)
    test_dataloader = paddle.io.DataLoader(
        test_dataset, feed_list=[x, y], drop_last=True,
        places=place, batch_size=64, return_list=False)
    # fleet初始化
    strategy = fleet.DistributedStrategy()
    fleet.init(is_collective=True, strategy=strategy)
    # 设置优化器
    optimizer = paddle.optimizer.Adam()
    optimizer = fleet.distributed_optimizer(optimizer)
    optimizer.minimize(cost)
    exe = paddle.static.Executor(place)
    exe.run(paddle.static.default_startup_program())
    prog = paddle.static.default_main_program()
    for epoch in range(epochs):
        train(epoch, exe, train_dataloader, cost, acc)
        if epoch % test_interval == 0:
            test(exe, test_dataloader, cost, acc)
        # save model
        if epoch % save_interval == 0:
            save(save_dir, [x], [pred], exe)

资源配置 (依据代码情况填写即可,上文中的案例用到了多节点和GPU资源)

a7c4fa11ef7057be47b65eac75c2a380.png


这条帮助是否解决了您的问题? 已解决 未解决

提交成功!非常感谢您的反馈,我们会继续努力做到更好! 很抱歉未能解决您的疑问。我们已收到您的反馈意见,同时会及时作出反馈处理!