分布式tensorflow搭建实践

机器学习,大数据 2018-05-21

工业中面临的常常是庞大的数据计算量,采用单一的tensorflow不能完全解决问题,需要结合hadoop,kafka,GPU,C++ 来进行加速和搭建系统。官方的whl安装包不能满足系统要求,通过源码编译安装来加快运行速度。

在此前请先搭建hadoop环境,安装bazel,我的环境 Ubuntu18.04,python3,hadoop2.7

一. 编译本地tensorflow安装包

1.clone tensorflow源代码到本地

2.进入 tensorflow目录下目录下 执行 ./configure

3.

1.png

需要注意的几点是:指定python3目录,开启jemalloc,hadoop,kafka接口支持(jemalloc用于管理内存分配,如果安装了cuda和MKL加速需要指定目录)

4.运行

bazel build -c opt //tensorflow/tools/pip_package:build_pip_package

编译耗时:至少60分钟

5.指定位置生成安装包并安装

bazel-bin/tensorflow/tools/pip_package/build_pip_package /tmp/tensorflow_pkg   
pip install /tmp/tensorflow_pkg/tensorflow-1.8.0-cp36-cp36m-linux_x86_64.whl

6.分发安装包到其它机器并安装

7.程序使用多块显卡

c = []
for d in ['/device:GPU:2', '/device:GPU:3']:
  with tf.device(d):
    a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3])
    b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2])
    c.append(tf.matmul(a, b))
with tf.device('/cpu:0'):
  sum = tf.add_n(c)
# 创建一个 session ,并将 log_device_placement 设置为 True。
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
# 执行这个操作。
print(sess.run(sum))

8.运行分布式tensorflow程序

import argparse
import sys

import tensorflow as tf

FLAGS = None

def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # 从参数服务器和工作主机创建一个集群
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # 创建并启动本地任务的服务器
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # 默认情况下将操作分配给本地Worker
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # 建立模型...
      loss = ...
      global_step = tf.contrib.framework.get_or_create_global_step()

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

    # StopAtStepHook 在运行给定步骤后处理停止
    hooks=[tf.train.StopAtStepHook(last_step=1000000)]

    # MonitoredTrainingSession 负责会话初始化
    # 从检查点恢复,保存到检查点,一旦完成或报错就关闭
    with tf.train.MonitoredTrainingSession(master=server.target,
                                           is_chief=(FLAGS.task_index == 0),
                                           checkpoint_dir="/tmp/train_logs",
                                           hooks=hooks) as mon_sess:
      while not mon_sess.should_stop():
        # 异步运行训练
        # mon_sess.run 在被抢占 PS 的情况下处理 AbortedError
        mon_sess.run(train_op)

if __name__ == "__main__":
  parser = argparse.ArgumentParser()
  parser.register("type", "bool", lambda v: v.lower() == "true")
  # 用于定义 tf.train.ClusterSpec 的标志
  parser.add_argument(
      "--ps_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--worker_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--job_name",
      type=str,
      default="",
      help="One of 'ps', 'worker'"
  )
  # Flags for defining the tf.train.Server
  parser.add_argument(
      "--task_index",
      type=int,
      default=0,
      help="Index of task within the job"
  )
  FLAGS, unparsed = parser.parse_known_args()
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)

在脚本中启动多个训练:

python trainer.py \
     --ps_hosts=ps0.xxx.com:2222,ps1.xxx.com:2222 \
     --worker_hosts=worker0.xxx.com:2222,worker1.xxx.com:2222 \
     --job_name=ps --task_index=0

如果在hadoop上运行,数据改为读写HDFS文件路径

filename_queue = tf.train.string_input_producer([
    "hdfs://namenode:8020/path/to/file1.csv",
    "hdfs://namenode:8020/path/to/file2.csv",
])

本文由 Tony 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

赏个馒头吧